http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java deleted file mode 100644 index c03b124..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java +++ /dev/null @@ -1,318 +0,0 @@ -package mvm.rya.cloudbase.mr.fileinput; - -import cloudbase.core.client.Connector; -import cloudbase.core.client.ZooKeeperInstance; -import cloudbase.core.client.admin.TableOperations; -import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; -import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.util.TextUtil; -import com.google.common.base.Preconditions; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolver; -import mvm.rya.cloudbase.CloudbaseRdfConstants; -import mvm.rya.cloudbase.mr.utils.MRUtils; -import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.rio.*; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.cloudbase.CloudbaseRdfUtils.extractValue; -import static mvm.rya.cloudbase.CloudbaseRdfUtils.from; - -/** - * Take large ntrips files and use MapReduce and Cloudbase - * Bulk ingest techniques to load into the table in our partition format. - * <p/> - * Input: NTrips file - * Map: - * - key : shard row - Text - * - value : stmt in doc triple format - Text - * Partitioner: RangePartitioner - * Reduce: - * - key : all the entries for each triple - Cloudbase Key - * Class BulkNtripsInputTool - * Date: Sep 13, 2011 - * Time: 10:00:17 AM - */ -public class BulkNtripsInputTool extends Configured implements Tool { - - public static final String WORKDIR_PROP = "bulk.n3.workdir"; - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.129:2181"; - private String ttl = null; - private String workDirBase = "/temp/bulkcb/work"; - private String format = RDFFormat.NTRIPLES.getName(); - - @Override - public int run(final String[] args) throws Exception { - final Configuration conf = getConf(); - try { - //conf - zk = conf.get(MRUtils.CB_ZK_PROP, zk); - ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); - instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); - workDirBase = conf.get(WORKDIR_PROP, workDirBase); - format = conf.get(MRUtils.FORMAT_PROP, format); - conf.set(MRUtils.FORMAT_PROP, format); - final String inputDir = args[0]; - - ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk); - Connector connector = zooKeeperInstance.getConnector(userName, pwd); - TableOperations tableOperations = connector.tableOperations(); - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (tablePrefix != null) - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, - tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, - tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX}; - Collection<Job> jobs = new ArrayList<Job>(); - for (final String tableName : tables) { - PrintStream out = null; - try { - String workDir = workDirBase + "/" + tableName; - System.out.println("Loading data into table[" + tableName + "]"); - - Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]"); - job.setJarByClass(this.getClass()); - //setting long job - Configuration jobConf = job.getConfiguration(); - jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); - jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256")); - jobConf.setBoolean("mapred.compress.map.output", true); -// jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(ParseNtripsMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - - job.setCombinerClass(OutStmtMutationsReducer.class); - job.setReducerClass(OutStmtMutationsReducer.class); - job.setOutputFormatClass(CloudbaseFileOutputFormat.class); - CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk); - - jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName); - - TextInputFormat.setInputPaths(job, new Path(inputDir)); - - FileSystem fs = FileSystem.get(conf); - Path workPath = new Path(workDir); - if (fs.exists(workPath)) - fs.delete(workPath, true); - - CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); - - out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); - - if (!tableOperations.exists(tableName)) - tableOperations.create(tableName); - Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE); - for (Text split : splits) - out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); - - job.setNumReduceTasks(splits.size() + 1); - out.close(); - - job.setPartitionerClass(KeyRangePartitioner.class); - RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); - - jobConf.set(WORKDIR_PROP, workDir); - - job.submit(); - jobs.add(job); - - } catch (Exception re) { - throw new RuntimeException(re); - } finally { - if (out != null) - out.close(); - } - } - - for (Job job : jobs) { - while (!job.isComplete()) { - Thread.sleep(1000); - } - } - - for (String tableName : tables) { - String workDir = workDirBase + "/" + tableName; - tableOperations.importDirectory( - tableName, - workDir + "/files", - workDir + "/failures", - 20, - 4, - false); - } - - } catch (Exception e) { - throw new RuntimeException(e); - } - - return 0; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * input: ntrips format triple - * <p/> - * output: key: shard row from generator - * value: stmt in serialized format (document format) - */ - public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { - public static final String TABLE_PROPERTY = "parsentripsmapper.table"; - - private RDFParser parser; - private String rdfFormat; - private String namedGraph; - private RyaContext ryaContext = RyaContext.getInstance(); - private TripleRowResolver rowResolver = ryaContext.getTripleResolver(); - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - final String table = conf.get(TABLE_PROPERTY); - Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job"); - - final String cv_s = conf.get(MRUtils.CB_CV_PROP); - final byte[] cv = cv_s == null ? null : cv_s.getBytes(); - rdfFormat = conf.get(MRUtils.FORMAT_PROP); - checkNotNull(rdfFormat, "Rdf format cannot be null"); - - namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP); - - parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); - parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY)); - parser.setRDFHandler(new RDFHandler() { - - @Override - public void startRDF() throws RDFHandlerException { - - } - - @Override - public void endRDF() throws RDFHandlerException { - - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { - RyaStatement rs = RdfToRyaConversions.convertStatement(statement); - if(rs.getColumnVisibility() == null) { - rs.setColumnVisibility(cv); - } - - // Inject the specified context into the statement. - if(namedGraph != null){ - rs.setContext(new RyaURI(namedGraph)); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs); - - if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) { - TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) { - TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) { - TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else - throw new IllegalArgumentException("Unrecognized table[" + table + "]"); - - } catch (Exception e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - - } - }); - } - - @Override - public void map(LongWritable key, Text value, Context output) - throws IOException, InterruptedException { - String rdf = value.toString(); - try { - parser.parse(new StringReader(rdf), ""); - } catch (RDFParseException e) { - System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]"); - } catch (Exception e) { - e.printStackTrace(); - throw new IOException("Exception occurred parsing triple[" + rdf + "]"); - } - } - } - - public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { - - public void reduce(Key key, Iterable<Value> values, Context output) - throws IOException, InterruptedException { - output.write(key, CloudbaseRdfConstants.EMPTY_VALUE); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java deleted file mode 100644 index 5aed4a2..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java +++ /dev/null @@ -1,230 +0,0 @@ -package mvm.rya.cloudbase.mr.fileinput; - -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Mutation; -import cloudbase.core.security.ColumnVisibility; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.cloudbase.CloudbaseRdfConstants; -import mvm.rya.cloudbase.RyaTableMutationsFactory; -import mvm.rya.cloudbase.mr.utils.MRUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.rio.*; - -import java.io.IOException; -import java.io.StringReader; -import java.util.Collection; -import java.util.Date; -import java.util.Map; - -/** - * Do bulk import of rdf files - * Class RdfFileInputTool2 - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputByLineTool implements Tool { - - private Configuration conf = new Configuration(); - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.113:2181"; - private String tablePrefix = null; - private RDFFormat format = RDFFormat.NTRIPLES; - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - conf.setLong("mapred.task.timeout", 600000000); - - zk = conf.get(MRUtils.CB_ZK_PROP, zk); - instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); - format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.getName())); - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputByLineTool.class); - - // set up cloudbase input - job.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); -// job.setOutputKeyClass(LongWritable.class); -// job.setOutputValueClass(StatementWritable.class); - - job.setOutputFormatClass(CloudbaseOutputFormat.class); - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); - - // set mapper and reducer classes - job.setMapperClass(TextToMutationMapper.class); - job.setNumReduceTasks(0); -// job.setReducerClass(Reducer.class); - - // set output -// Path outputDir = new Path("/temp/sparql-out/testout"); -// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); -// if (dfs.exists(outputDir)) -// dfs.deleteMutation(outputDir, true); -// -// FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - return (int) runJob(args); - } - - public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> { - protected RDFParser parser; - private String prefix; - private RDFFormat rdfFormat; - protected Text spo_table; - private Text po_table; - private Text osp_table; - private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression(); - - public TextToMutationMapper() { - } - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (prefix != null) { - RdfCloudTripleStoreConstants.prefixTables(prefix); - } - - spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - - final String cv_s = conf.get(MRUtils.CB_CV_PROP); - if (cv_s != null) - cv = cv_s.getBytes(); - - rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString())); - parser = Rio.createParser(rdfFormat); - final RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); - - parser.setRDFHandler(new RDFHandler() { - - @Override - public void startRDF() throws RDFHandlerException { - - } - - @Override - public void endRDF() throws RDFHandlerException { - - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { - RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); - if(ryaStatement.getColumnVisibility() == null) { - ryaStatement.setColumnVisibility(cv); - } - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = - mut.serialize(ryaStatement); - Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - - for (Mutation m : spo) { - context.write(spo_table, m); - } - for (Mutation m : po) { - context.write(po_table, m); - } - for (Mutation m : osp) { - context.write(osp_table, m); - } - } catch (Exception e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - - } - }); - } - - @Override - protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { - try { - parser.parse(new StringReader(value.toString()), ""); - } catch (Exception e) { - throw new IOException(e); - } - } - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java deleted file mode 100644 index 54f9a13..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java +++ /dev/null @@ -1,115 +0,0 @@ -package mvm.rya.cloudbase.mr.fileinput; - -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.cloudbase.mr.utils.MRUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.openrdf.model.Statement; -import org.openrdf.rio.*; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Be able to input multiple rdf formatted files. Convert from rdf format to statements. - * Class RdfFileInputFormat - * Date: May 16, 2011 - * Time: 2:11:24 PM - */ -public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> { - - @Override - public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new RdfFileRecordReader(); - } - - private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler { - - boolean closed = false; - long count = 0; - BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>(); - int total = 0; - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) inputSplit; - Configuration conf = taskAttemptContext.getConfiguration(); - String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); //default to RDF/XML - RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s); - - Path file = fileSplit.getPath(); - FileSystem fs = file.getFileSystem(conf); - FSDataInputStream fileIn = fs.open(fileSplit.getPath()); - - RDFParser rdfParser = Rio.createParser(rdfFormat); - rdfParser.setRDFHandler(this); - try { - rdfParser.parse(fileIn, ""); - } catch (Exception e) { - throw new IOException(e); - } - fileIn.close(); - total = queue.size(); - //TODO: Make this threaded so that you don't hold too many statements before sending them - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return queue.size() > 0; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(count++); - } - - @Override - public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException { - return queue.poll(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return ((float) (total - queue.size())) / ((float) total); - } - - @Override - public void close() throws IOException { - closed = true; - } - - @Override - public void startRDF() throws RDFHandlerException { - } - - @Override - public void endRDF() throws RDFHandlerException { - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement))); - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java deleted file mode 100644 index f48cbae..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java +++ /dev/null @@ -1,185 +0,0 @@ -package mvm.rya.cloudbase.mr.fileinput; - -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Mutation; -import cloudbase.core.security.ColumnVisibility; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.cloudbase.CloudbaseRdfConstants; -import mvm.rya.cloudbase.RyaTableMutationsFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.rio.RDFFormat; - -import java.io.IOException; -import java.util.Collection; -import java.util.Date; -import java.util.Map; - -import static mvm.rya.cloudbase.mr.utils.MRUtils.*; - -/** - * Do bulk import of rdf files - * Class RdfFileInputTool - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputTool implements Tool { - - private Configuration conf; - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.113:2181"; - private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; - private String format = RDFFormat.RDFXML.getName(); - - - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { - //faster - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - - zk = conf.get(CB_ZK_PROP, zk); - instance = conf.get(CB_INSTANCE_PROP, instance); - userName = conf.get(CB_USERNAME_PROP, userName); - pwd = conf.get(CB_PWD_PROP, pwd); - - tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, tablePrefix); - format = conf.get(FORMAT_PROP, format); - conf.set(FORMAT_PROP, format); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputTool.class); - - // set up cloudbase input - job.setInputFormatClass(RdfFileInputFormat.class); - RdfFileInputFormat.addInputPath(job, new Path(args[0])); - - // set input output of the particular job - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(RyaStatementWritable.class); -// job.setOutputKeyClass(LongWritable.class); -// job.setOutputValueClass(StatementWritable.class); - - job.setOutputFormatClass(CloudbaseOutputFormat.class); - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); - - // set mapper and reducer classes - job.setMapperClass(StatementToMutationMapper.class); - job.setNumReduceTasks(0); -// job.setReducerClass(Reducer.class); - - // set output -// Path outputDir = new Path("/temp/sparql-out/testout"); -// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); -// if (dfs.exists(outputDir)) -// dfs.deleteMutation(outputDir, true); -// -// FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> { - protected String tablePrefix; - protected Text spo_table; - protected Text po_table; - protected Text osp_table; - private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression(); - RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); - - public StatementToMutationMapper() { - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - - final String cv_s = conf.get(CB_CV_PROP); - if (cv_s != null) - cv = cv_s.getBytes(); - } - - @Override - protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { - RyaStatement statement = value.getRyaStatement(); - if (statement.getColumnVisibility() == null) { - statement.setColumnVisibility(cv); - } - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = - mut.serialize(statement); - Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - - for (Mutation m : spo) { - context.write(spo_table, m); - } - for (Mutation m : po) { - context.write(po_table, m); - } - for (Mutation m : osp) { - context.write(osp_table, m); - } - } - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java deleted file mode 100644 index 5d7d971..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java +++ /dev/null @@ -1,314 +0,0 @@ -//package mvm.rya.cloudbase.mr.fileinput; -// -//import cloudbase.core.client.Connector; -//import cloudbase.core.client.ZooKeeperInstance; -//import cloudbase.core.client.admin.TableOperations; -//import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; -//import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; -//import cloudbase.core.data.Key; -//import cloudbase.core.data.Value; -//import cloudbase.core.util.TextUtil; -//import com.google.common.base.Preconditions; -//import mvm.rya.api.RdfCloudTripleStoreConstants; -//import mvm.rya.cloudbase.CloudbaseRdfConstants; -//import mvm.rya.cloudbase.RyaTableKeyValues; -//import mvm.rya.cloudbase.mr.utils.MRUtils; -//import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; -//import mvm.rya.cloudbase.utils.shard.HashAlgorithm; -//import mvm.rya.cloudbase.utils.shard.HashCodeHashAlgorithm; -//import org.apache.commons.codec.binary.Base64; -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.conf.Configured; -//import org.apache.hadoop.fs.FileSystem; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.io.LongWritable; -//import org.apache.hadoop.io.Text; -//import org.apache.hadoop.mapreduce.Job; -//import org.apache.hadoop.mapreduce.Mapper; -//import org.apache.hadoop.mapreduce.Reducer; -//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -//import org.apache.hadoop.util.Tool; -//import org.apache.hadoop.util.ToolRunner; -//import org.openrdf.model.Resource; -//import org.openrdf.model.Statement; -//import org.openrdf.rio.*; -// -//import java.io.BufferedOutputStream; -//import java.io.IOException; -//import java.io.PrintStream; -//import java.io.StringReader; -//import java.util.ArrayList; -//import java.util.Collection; -//import java.util.Map; -// -//import static com.google.common.base.Preconditions.checkNotNull; -// -///** -//* Take large ntrips files and use MapReduce and Cloudbase -//* Bulk ingest techniques to load into the table in our partition format. -//* Uses a sharded scheme -//* <p/> -//* Input: NTrips file -//* Map: -//* - key : shard row - Text -//* - value : stmt in doc triple format - Text -//* Partitioner: RangePartitioner -//* Reduce: -//* - key : all the entries for each triple - Cloudbase Key -//* Class BulkNtripsInputTool -//* Date: Sep 13, 2011 -//* Time: 10:00:17 AM -//*/ -//public class ShardedBulkNtripsInputTool extends Configured implements Tool { -// -// public static final String WORKDIR_PROP = "bulk.n3.workdir"; -// public static final String BULK_N3_NUMSHARD = "bulk.n3.numshard"; -// -// private String userName = "root"; -// private String pwd = "password"; -// private String instance = "stratus"; -// private String zk = "10.40.190.129:2181"; -// private String ttl = null; -// private String workDirBase = "/temp/bulkcb/work"; -// private String format = RDFFormat.NTRIPLES.getName(); -// private int numShards; -// -// @Override -// public int run(final String[] args) throws Exception { -// final Configuration conf = getConf(); -// try { -// //conf -// zk = conf.get(MRUtils.CB_ZK_PROP, zk); -// ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); -// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); -// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); -// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); -// workDirBase = conf.get(WORKDIR_PROP, workDirBase); -// format = conf.get(MRUtils.FORMAT_PROP, format); -// String numShards_s = conf.get(BULK_N3_NUMSHARD); -// Preconditions.checkArgument(numShards_s != null); -// numShards = Integer.parseInt(numShards_s); -// conf.set(MRUtils.FORMAT_PROP, format); -// final String inputDir = args[0]; -// -// ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk); -// Connector connector = zooKeeperInstance.getConnector(userName, pwd); -// TableOperations tableOperations = connector.tableOperations(); -// -// String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); -// if (tablePrefix != null) -// RdfCloudTripleStoreConstants.prefixTables(tablePrefix); -// String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, -// tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, -// tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX}; -// Collection<Job> jobs = new ArrayList<Job>(); -// for (final String table : tables) { -// for (int i = 0; i < numShards; i++) { -// final String tableName = table + i; -// PrintStream out = null; -// try { -// String workDir = workDirBase + "/" + tableName; -// System.out.println("Loading data into table[" + tableName + "]"); -// -// Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]"); -// job.setJarByClass(this.getClass()); -// //setting long job -// job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); -// job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); -// job.getConfiguration().set("io.sort.mb", "256"); -// job.getConfiguration().setBoolean("mapred.compress.map.output", true); -// job.getConfiguration().set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression -// -// job.setInputFormatClass(TextInputFormat.class); -// -// job.setMapperClass(ShardedParseNtripsMapper.class); -// job.setMapOutputKeyClass(Key.class); -// job.setMapOutputValueClass(Value.class); -// -// job.setCombinerClass(OutStmtMutationsReducer.class); -// job.setReducerClass(OutStmtMutationsReducer.class); -// job.setOutputFormatClass(CloudbaseFileOutputFormat.class); -// CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk); -// -// job.getConfiguration().set(ShardedParseNtripsMapper.TABLE_PROPERTY, tableName); -// job.getConfiguration().set(ShardedParseNtripsMapper.SHARD_PROPERTY, i + ""); -// -// TextInputFormat.setInputPaths(job, new Path(inputDir)); -// -// FileSystem fs = FileSystem.get(conf); -// Path workPath = new Path(workDir); -// if (fs.exists(workPath)) -// fs.deleteMutation(workPath, true); -// -// CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); -// -// out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); -// -// if (!tableOperations.exists(tableName)) -// tableOperations.create(tableName); -// Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE); -// for (Text split : splits) -// out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); -// -// job.setNumReduceTasks(splits.size() + 1); -// out.close(); -// -// job.setPartitionerClass(KeyRangePartitioner.class); -// RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); -// -// job.getConfiguration().set(WORKDIR_PROP, workDir); -// -// job.submit(); -// jobs.add(job); -// -// } catch (Exception re) { -// throw new RuntimeException(re); -// } finally { -// if (out != null) -// out.close(); -// } -// } -// } -// -// for (Job job : jobs) { -// while (!job.isComplete()) { -// Thread.sleep(1000); -// } -// } -// -// for (String table : tables) { -// for (int i = 0; i < numShards; i++) { -// final String tableName = table + i; -// String workDir = workDirBase + "/" + tableName; -// tableOperations.importDirectory( -// tableName, -// workDir + "/files", -// workDir + "/failures", -// 20, -// 4, -// false); -// } -// } -// -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// -// return 0; -// } -// -// public static void main(String[] args) { -// try { -// ToolRunner.run(new Configuration(), new ShardedBulkNtripsInputTool(), args); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -// /** -// * input: ntrips format triple -// * <p/> -// * output: key: shard row from generator -// * value: stmt in serialized format (document format) -// */ -// public static class ShardedParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { -// public static final String TABLE_PROPERTY = "shardedparsentripsmapper.table"; -// public static final String SHARD_PROPERTY = "shardedparsentripsmapper.shard"; -// -// private RDFParser parser; -// private String rdfFormat; -// private HashAlgorithm hashAlgorithm = new HashCodeHashAlgorithm(); -// private int shard; -// private int numShards; -// -// @Override -// protected void setup(final Context context) throws IOException, InterruptedException { -// super.setup(context); -// Configuration conf = context.getConfiguration(); -// final String table = conf.get(TABLE_PROPERTY); -// Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job"); -// -// String shard_s = conf.get(SHARD_PROPERTY); -// Preconditions.checkNotNull(shard_s, "Set the " + SHARD_PROPERTY + " property"); -// shard = Integer.parseInt(shard_s); -// -// numShards = Integer.parseInt(conf.get(BULK_N3_NUMSHARD)); -// -// final String cv_s = conf.get(MRUtils.CB_CV_PROP); -// rdfFormat = conf.get(MRUtils.FORMAT_PROP); -// checkNotNull(rdfFormat, "Rdf format cannot be null"); -// -// parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); -// parser.setRDFHandler(new RDFHandler() { -// -// @Override -// public void startRDF() throws RDFHandlerException { -// -// } -// -// @Override -// public void endRDF() throws RDFHandlerException { -// -// } -// -// @Override -// public void handleNamespace(String s, String s1) throws RDFHandlerException { -// -// } -// -// @Override -// public void handleStatement(Statement statement) throws RDFHandlerException { -// try { -// Resource subject = statement.getSubject(); -// if ((hashAlgorithm.hash(subject.stringValue()) % numShards) != shard) { -// return; -// } -// RyaTableKeyValues rdfTableKeyValues = new RyaTableKeyValues(subject, statement.getPredicate(), statement.getObject(), cv_s, statement.getContext()).invoke(); -// Collection<Map.Entry<Key, Value>> entries = null; -// if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) { -// entries = rdfTableKeyValues.getSpo(); -// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) { -// entries = rdfTableKeyValues.getPo(); -// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) { -// entries = rdfTableKeyValues.getOsp(); -// } else -// throw new IllegalArgumentException("Unrecognized table[" + table + "]"); -// -// for (Map.Entry<Key, Value> entry : entries) { -// context.write(entry.getKey(), entry.getValue()); -// } -// } catch (Exception e) { -// throw new RDFHandlerException(e); -// } -// } -// -// @Override -// public void handleComment(String s) throws RDFHandlerException { -// -// } -// }); -// } -// -// @Override -// public void map(LongWritable key, Text value, Context output) -// throws IOException, InterruptedException { -// String rdf = value.toString(); -// try { -// parser.parse(new StringReader(rdf), ""); -// } catch (RDFParseException e) { -// System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]"); -// } catch (Exception e) { -// e.printStackTrace(); -// throw new IOException("Exception occurred parsing triple[" + rdf + "]"); -// } -// } -// } -// -// public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { -// -// public void reduce(Key key, Iterable<Value> values, Context output) -// throws IOException, InterruptedException { -// output.write(key, CloudbaseRdfConstants.EMPTY_VALUE); -// } -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java deleted file mode 100644 index 453d6ca..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java +++ /dev/null @@ -1,350 +0,0 @@ -//package mvm.rya.cloudbase.mr.upgrade; -// -//import cloudbase.core.client.Connector; -//import cloudbase.core.client.ZooKeeperInstance; -//import cloudbase.core.client.admin.TableOperations; -//import cloudbase.core.client.mapreduce.CloudbaseInputFormat; -//import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -//import cloudbase.core.data.Key; -//import cloudbase.core.data.Mutation; -//import cloudbase.core.data.Range; -//import cloudbase.core.data.Value; -//import cloudbase.core.security.Authorizations; -//import cloudbase.core.security.ColumnVisibility; -//import cloudbase.core.util.Pair; -//import com.google.common.collect.Lists; -//import com.google.common.io.ByteArrayDataInput; -//import com.google.common.io.ByteArrayDataOutput; -//import com.google.common.io.ByteStreams; -//import mvm.rya.api.InvalidValueTypeMarkerRuntimeException; -//import mvm.rya.api.RdfCloudTripleStoreConstants; -//import mvm.rya.cloudbase.CloudbaseRdfConfiguration; -//import mvm.rya.cloudbase.CloudbaseRdfConstants; -//import mvm.rya.cloudbase.CloudbaseRyaDAO; -//import mvm.rya.cloudbase.RyaTableMutationsFactory; -//import mvm.rya.cloudbase.mr.utils.MRUtils; -//import org.apache.hadoop.conf.Configuration; -//import org.apache.hadoop.conf.Configured; -//import org.apache.hadoop.io.Text; -//import org.apache.hadoop.mapreduce.Job; -//import org.apache.hadoop.mapreduce.Mapper; -//import org.apache.hadoop.util.Tool; -//import org.apache.hadoop.util.ToolRunner; -//import org.openrdf.model.*; -//import org.openrdf.model.impl.StatementImpl; -//import org.openrdf.model.impl.ValueFactoryImpl; -// -//import java.io.IOException; -//import java.util.ArrayList; -//import java.util.Collection; -//import java.util.Date; -//import java.util.Map; -// -//import static mvm.rya.api.RdfCloudTripleStoreUtils.*; -// -///** -// * 1. Check version. <br/> -// * 2. If version does not exist, apply: <br/> -// * - DELIM => 1 -> 0 -// * - DELIM_STOP => 2 -> 1 -// * - 3 table index -// */ -//public class UpgradeCloudbaseRdfTables extends Configured implements Tool { -// public static final String TMP = "_tmp"; -// public static final String DELETE_PROP = "rdf.upgrade.deleteMutation"; //true if ok to deleteMutation old tables -// private String zk = "10.40.190.113:2181"; -// private String instance = "stratus"; -// private String userName = "root"; -// private String pwd = "password"; -// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; -// private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); -// -// @Override -// public int run(String[] strings) throws Exception { -// conf = new CloudbaseRdfConfiguration(getConf()); -// //faster -// conf.setBoolean("mapred.map.tasks.speculative.execution", false); -// conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); -// conf.set(MRUtils.JOB_NAME_PROP, "Upgrading Cloudbase Rdf Tables"); -// -// zk = conf.get(MRUtils.CB_ZK_PROP, zk); -// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); -// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); -// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); -// -// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); -// -// Authorizations authorizations = CloudbaseRdfConstants.ALL_AUTHORIZATIONS; -// String auth = conf.get(MRUtils.CB_AUTH_PROP); -// if (auth != null) -// authorizations = new Authorizations(auth.split(",")); -// -// boolean deleteTables = conf.getBoolean(DELETE_PROP, false); -// -// //tables -// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; -// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; -// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; -// String so = tablePrefix + "so"; -// String ops = tablePrefix + "o"; -// -// //check version first -// Connector connector = new ZooKeeperInstance(instance, zk).getConnector(userName, pwd.getBytes()); -// CloudbaseRyaDAO rdfDAO = new CloudbaseRyaDAO(); -// rdfDAO.setConnector(connector); -// conf.setTablePrefix(tablePrefix); -// rdfDAO.setConf(conf); -//// rdfDAO.setSpoTable(spo); -//// rdfDAO.setPoTable(po); -//// rdfDAO.setOspTable(osp); -//// rdfDAO.setNamespaceTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); -// rdfDAO.init(); -// String version = rdfDAO.getVersion(); -// if (version != null) { -// //TODO: Do a version check here -// //version found, no need to upgrade -// return 0; -// } -// -// rdfDAO.destroy(); -// -// //create osp table, deleteMutation so and o tables -// TableOperations tableOperations = connector.tableOperations(); -// if (deleteTables) { -// if (tableOperations.exists(so)) { -// tableOperations.deleteMutation(so); -// } -// if (tableOperations.exists(ops)) { -// tableOperations.deleteMutation(ops); -// } -// } -// -// conf.set("io.sort.mb", "256"); -// Job job = new Job(conf); -// job.setJarByClass(UpgradeCloudbaseRdfTables.class); -// -// //set up cloudbase input -// job.setInputFormatClass(CloudbaseInputFormat.class); -// CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), spo, authorizations); -// CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); -// Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>(); -// final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT); -// columns.add(pair); -// CloudbaseInputFormat.fetchColumns(job, columns); -// -// CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); -// -// // set input output of the particular job -// job.setMapOutputKeyClass(Text.class); -// job.setMapOutputValueClass(Mutation.class); -// -// //no reducer needed? -// job.setNumReduceTasks(0); -// job.setMapperClass(UpgradeCloudbaseRdfTablesMapper.class); -// -// CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, spo + TMP); -// CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); -// job.setOutputFormatClass(CloudbaseOutputFormat.class); -// -// // Submit the job -// Date startTime = new Date(); -// System.out.println("Job started: " + startTime); -// int exitCode = job.waitForCompletion(true) ? 0 : 1; -// -// if (exitCode == 0) { -// Date end_time = new Date(); -// System.out.println("Job ended: " + end_time); -// System.out.println("The job took " -// + (end_time.getTime() - startTime.getTime()) / 1000 -// + " seconds."); -// -// //now deleteMutation old spo table, and rename tmp one -// if (deleteTables) { -// tableOperations.deleteMutation(spo); -// tableOperations.rename(spo + TMP, spo); -// tableOperations.deleteMutation(po); -// tableOperations.rename(po + TMP, po); -// tableOperations.deleteMutation(osp); -// tableOperations.rename(osp + TMP, osp); -// } -// -// return 0; -// } else { -// System.out.println("Job Failed!!!"); -// } -// -// return -1; -// } -// -// public static void main(String[] args) { -// try { -// ToolRunner.run(new Configuration(), new UpgradeCloudbaseRdfTables(), args); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -// public static class UpgradeCloudbaseRdfTablesMapper extends Mapper<Key, Value, Text, Mutation> { -// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; -// ValueFactoryImpl vf = new ValueFactoryImpl(); -// -// private Text spo_table, po_table, osp_table; -// -// RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); -// -// @Override -// protected void setup(Context context) throws IOException, InterruptedException { -// super.setup(context); -// Configuration conf = context.getConfiguration(); -// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); -// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX + TMP; -// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX + TMP; -// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX + TMP; -// -// spo_table = new Text(spo); -// po_table = new Text(po); -// osp_table = new Text(osp); -// } -// -// @Override -// protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { -// //read in old format -// Statement statement = null; -// try { -// statement = translateOldStatementFromRow(ByteStreams.newDataInput(key.getRow().getBytes()), "spo", vf); -// } catch (Exception e) { -// //not the right version -// return; -// } -// -// //translate to new format and save in new tables -// Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Mutation> mutationMap = mut.serialize(statement.getSubject(), statement.getPredicate(), statement.getObject(), new ColumnVisibility(key.getColumnVisibility()), statement.getContext()); -// Mutation spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); -// Mutation po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); -// Mutation osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); -// -// context.write(spo_table, spo); -// context.write(po_table, po); -// context.write(osp_table, osp); -// -// //TODO: Contexts -// } -// } -// -// public static org.openrdf.model.Value readOldValue(ByteArrayDataInput dataIn, ValueFactory vf) -// throws IOException, ClassCastException { -// int valueTypeMarker; -// try { -// valueTypeMarker = dataIn.readByte(); -// } catch (Exception e) { -// return null; -// } -// -// org.openrdf.model.Value ret = null; -// if (valueTypeMarker == RdfCloudTripleStoreConstants.URI_MARKER) { -// String uriString = readString(dataIn); -// ret = vf.createURI(uriString); -// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.BNODE_MARKER) { -// String bnodeID = readString(dataIn); -// ret = vf.createBNode(bnodeID); -// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER) { -// String label = readString(dataIn); -// ret = vf.createLiteral(label); -// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER) { -// String label = readString(dataIn); -// String language = readString(dataIn); -// ret = vf.createLiteral(label, language); -// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER) { -// String label = readString(dataIn); -// URI datatype = (URI) readOldValue(dataIn, vf); -// ret = vf.createLiteral(label, datatype); -// } else { -// throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type marker: " -// + valueTypeMarker); -// } -// -// return ret; -// } -// -// public static Statement translateOldStatementFromRow(ByteArrayDataInput input, String table, ValueFactory vf) throws IOException { -// Resource subject; -// URI predicate; -// org.openrdf.model.Value object; -// if ("spo".equals(table)) { -// subject = (Resource) readOldValue(input, vf); -// input.readByte(); -// predicate = (URI) readOldValue(input, vf); -// input.readByte(); -// object = readOldValue(input, vf); -// } else if ("o".equals(table)) { -// object = readOldValue(input, vf); -// input.readByte(); -// predicate = (URI) readOldValue(input, vf); -// input.readByte(); -// subject = (Resource) readOldValue(input, vf); -// } else if ("po".equals(table)) { -// predicate = (URI) readOldValue(input, vf); -// input.readByte(); -// object = readOldValue(input, vf); -// input.readByte(); -// subject = (Resource) readOldValue(input, vf); -// } else { -// //so -// subject = (Resource) readOldValue(input, vf); -// input.readByte(); -// object = readOldValue(input, vf); -// input.readByte(); -// predicate = (URI) readOldValue(input, vf); -// } -// return new StatementImpl(subject, predicate, object); -// } -// -// public static byte[] writeOldValue(org.openrdf.model.Value value) throws IOException { -// if (value == null) -// return new byte[]{}; -// ByteArrayDataOutput dataOut = ByteStreams.newDataOutput(); -// if (value instanceof URI) { -// dataOut.writeByte(RdfCloudTripleStoreConstants.URI_MARKER); -// writeString(((URI) value).toString(), dataOut); -// } else if (value instanceof BNode) { -// dataOut.writeByte(RdfCloudTripleStoreConstants.BNODE_MARKER); -// writeString(((BNode) value).getID(), dataOut); -// } else if (value instanceof Literal) { -// Literal lit = (Literal) value; -// -// String label = lit.getLabel(); -// String language = lit.getLanguage(); -// URI datatype = lit.getDatatype(); -// -// if (datatype != null) { -// dataOut.writeByte(RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER); -// writeString(label, dataOut); -// dataOut.write(writeOldValue(datatype)); -// } else if (language != null) { -// dataOut.writeByte(RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER); -// writeString(label, dataOut); -// writeString(language, dataOut); -// } else { -// dataOut.writeByte(RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER); -// writeString(label, dataOut); -// } -// } else { -// throw new IllegalArgumentException("unexpected value type: " -// + value.getClass()); -// } -// return dataOut.toByteArray(); -// } -// -// private static String OLD_DELIM = "\u0001"; -// private static byte[] OLD_DELIM_BYTES = OLD_DELIM.getBytes(); -// -// public static byte[] buildOldRowWith(byte[] bytes_one, byte[] bytes_two, byte[] bytes_three) throws IOException { -// ByteArrayDataOutput rowidout = ByteStreams.newDataOutput(); -// rowidout.write(bytes_one); -// rowidout.write(OLD_DELIM_BYTES); -// rowidout.write(bytes_two); -// rowidout.write(OLD_DELIM_BYTES); -// rowidout.write(bytes_three); -// return truncateRowId(rowidout.toByteArray()); -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java deleted file mode 100644 index 950f585..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java +++ /dev/null @@ -1,94 +0,0 @@ -package mvm.rya.cloudbase.mr.utils; - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.URI; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -/** - * Class MRSailUtils - * Date: May 19, 2011 - * Time: 10:34:06 AM - */ -public class MRUtils { - - public static final String JOB_NAME_PROP = "mapred.job.name"; - - public static final String CB_USERNAME_PROP = "cb.username"; - public static final String CB_PWD_PROP = "cb.pwd"; - public static final String CB_ZK_PROP = "cb.zk"; - public static final String CB_INSTANCE_PROP = "cb.instance"; - public static final String CB_TTL_PROP = "cb.ttl"; - public static final String CB_CV_PROP = "cb.cv"; - public static final String CB_AUTH_PROP = "cb.auth"; - public static final String CB_MOCK_PROP = "cb.mock"; - public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout"; - public static final String FORMAT_PROP = "rdf.format"; - - public static final String NAMED_GRAPH_PROP = "rdf.graph"; - - public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix"; - - // rdf constants - public static final ValueFactory vf = new ValueFactoryImpl(); - public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"); - - - // cloudbase map reduce utils - -// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException { -// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key)); -// if (entry_val != null) { -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val)); -// } -// byte[] startrow = startRowOut.toByteArray(); -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES); -// byte[] stoprow = startRowOut.toByteArray(); -// -// Range range = new Range(new Text(startrow), new Text(stoprow)); -// return range; -// } - - - public static String getCBTtl(Configuration conf) { - return conf.get(CB_TTL_PROP); - } - - public static String getCBUserName(Configuration conf) { - return conf.get(CB_USERNAME_PROP); - } - - public static String getCBPwd(Configuration conf) { - return conf.get(CB_PWD_PROP); - } - - public static String getCBZK(Configuration conf) { - return conf.get(CB_ZK_PROP); - } - - public static String getCBInstance(Configuration conf) { - return conf.get(CB_INSTANCE_PROP); - } - - public static void setCBUserName(Configuration conf, String str) { - conf.set(CB_USERNAME_PROP, str); - } - - public static void setCBPwd(Configuration conf, String str) { - conf.set(CB_PWD_PROP, str); - } - - public static void setCBZK(Configuration conf, String str) { - conf.set(CB_ZK_PROP, str); - } - - public static void setCBInstance(Configuration conf, String str) { - conf.set(CB_INSTANCE_PROP, str); - } - - public static void setCBTtl(Configuration conf, String str) { - conf.set(CB_TTL_PROP, str); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java deleted file mode 100644 index d3f8ae7..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java +++ /dev/null @@ -1,34 +0,0 @@ -package mvm.rya.cloudbase.query; - -import cloudbase.core.client.BatchScanner; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import com.google.common.base.Preconditions; -import mango.collect.AbstractCloseableIterable; -import mvm.rya.cloudbase.BatchScannerIterator; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -/** - */ -public class BatchScannerCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> { - - private BatchScanner scanner; - - public BatchScannerCloseableIterable(BatchScanner scanner) { - Preconditions.checkNotNull(scanner); - this.scanner = scanner; - } - - @Override - protected void doClose() throws IOException { - scanner.close(); - } - - @Override - protected Iterator<Map.Entry<Key, Value>> retrieveIterator() { - return new BatchScannerIterator(scanner.iterator()); - } -}