Hi Pavan,

Thanks a lot. That will be great.
Based on your inputs I wrote an implementation to support my scenario.
Which include writing a mapping input format.

But I am not clear how to get input into this mapping input format.(How to
specify an input path for a MappingInputformat).

I looked into ConfigurationUtils but It did not have any option to do that.

It will be great if you could give me some input on how to handle this
matter. I am attaching my mapping input format just in case that is
helpful.

Thanks,
Charith






On Wed, Oct 1, 2014 at 7:42 PM, Pavan Kumar A <pava...@outlook.com> wrote:

> I will write a detailed explanation in weekend. thanks for your interest.
>
> ------------------------------
> Date: Wed, 1 Oct 2014 10:56:16 -0700
>
> Subject: Re: Using a custom graph partitioning stratergy with giraph
> From: charith.dhanus...@gmail.com
> To: user@giraph.apache.org
>
> Thanks Pavan,
>
> I get the high level level idea. I am still new to Giraph code base so I
> am still trying to understand the overall design.
>
> So I have few questions regarding this feature.
>
> Can we use this feature with a vertex input format without using edge
> translation?
> (Since   getPartition in MappingStoreOps  can be used to get the
> partition of any target vertex)
>
> Also, since I have mapping information in a separate file  do I need to
> embed target information in the vertex?
>
> It will be great if you could explain your scenario with dataformat you
> used and what extension points you used so that I can understand it better
> and adapt to my scenario.
>
>
> Thanks,
> Charith
>
>
>
>
> On Mon, Sep 29, 2014 at 3:34 PM, Pavan Kumar A <pava...@outlook.com>
> wrote:
>
> we have two inputs - vertex & edges
> if we partition edges vertices based on a map, then when we want to send
> messages we should be able to know which partition a vertex is on.
>
> typically we send messages to targetIds of outgoing edges, edge transation
> helps encode mapping information into targetIds, so knowing which partition
> to send a message can be done by just looking at the targetid
> ------------------------------
> Date: Mon, 29 Sep 2014 14:37:22 -0700
>
> Subject: Re: Using a custom graph partitioning stratergy with giraph
> From: charith.dhanus...@gmail.com
> To: user@giraph.apache.org
>
> Hi Pavan,
>
> Thanks for the details. I went through the code specially the
> extension points you mentioned.
> I am not clear about the function of the edge Translation (org.apache.
> giraph.mapping.translate.TranslateEdge) class. Could you please explain
> the idea of this translation process.
>
> In my case I will have a mapping file which maps each vertex to a
> partition.
>
> ex:
>
> v1 part1
> v2 part2
> v3 part3
> .
> .
> .
>
> So I was thinking of passing this as a parameter and reading inside my own
> MappingStore Implementation
> -Dgiraph.mappingFilePath=/user/charith/input/mapping.txt
>
> Is there a better approach?
>
> Thanks,
> Charith
>
>
>
>
>
>
>
>
> On Sun, Sep 28, 2014 at 8:29 AM, Pavan Kumar A <pava...@outlook.com>
> wrote:
>
> I worked on this feature sometime back - but I only worked on inputting
> hive file & not hdfs
>
> You can use logic outside giraph to select which partition file to use -
> this is possible because you input the number of workers anyway.
> For instance in the script that you use to launch a giraph job have a
> selection logic for the partition file
>
> You can take a look at : https://issues.apache.org/jira/browse/GIRAPH-908
> You might have to extend upon the jira for your specific use case - I only
> added support for case when id = longwritable
>
> Here is a list of options you might want to explore
>
>  # Mapping Store related information
>
> -Dgiraph.mappingStoreClass=org.apache.giraph.mapping.LongByteMappingStore
>     -Dgiraph.lbMappingStoreUpper=1987000
>     -Dgiraph.lbMappingStoreLower=4096
>     # Mapping tore ops information
>
> -Dgiraph.mappingStoreOpsClass=org.apache.giraph.mapping.DefaultEmbeddedLongByteOps
>     # Embed mapping information
>
> -Dgiraph.edgeTranslationClass=org.apache.giraph.mapping.translate.LongByteTranslateEdge
>     # PartitionerFactory to be used
>
> -Dgiraph.graphPartitionerFactoryClass=org.apache.giraph.partition.LongMappingStorePartitionerFactory
>
> So the partition map is stored here as map of byte arrays. with 
> .lbMappingStoreUpper
> being size of map and lbMappingStoreLower being size of individual arrays
>
> Please explore code & tell me what else you need.
> Thanks
> ------------------------------
> Date: Sat, 27 Sep 2014 22:51:29 -0700
> Subject: Re: Using a custom graph partitioning stratergy with giraph
> From: charith.dhanus...@gmail.com
> To: user@giraph.apache.org
>
>
> Also adding some more information.
>
> My current understanding is I should be able to do this by  my own
> org.apache.giraph.partition.WorkerGraphPartitioner implementation.
>
> But my question is, Is there are a way to get some outside input inside
> the WorkerGraphPartitioner?In my case it will be an hdfs file location.
>
> Thanks,
> Charith
>
>
>
>
>
>
>
>
>
>
> On Sat, Sep 27, 2014 at 10:13 PM, Charith Wickramarachchi <
> charith.dhanus...@gmail.com> wrote:
>
> Hi,
>
> I m trying to use giraph with a custom graph partitioner that I have. In
> my case i want to assign vertices to workers based on a custom partitioner
> input.
>
> In my case partitioner will take number of workers as an input parameter
> and give me a file which maps each vertex id to a worker. I m trying load
> this file to a hdfs location and use it as an input to the giraph and do
> the vertex assignment.
>
> Any suggestions or pointers on best way to this will be highly appricated
> (Use the current extention points of giraph as much as possible to avoid
> random hacks).
>
> I m currently using giraph-1.0.0.
>
> Thanks,
> Charith
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Web  http://apache.org/~charith <http://www-scf.usc.edu/~cwickram/>
> <http://charith.wickramaarachchi.org/>
> Blog  http://charith.wickramaarachchi.org/
> <http://charithwiki.blogspot.com/>
> Twitter  @charithwiki <https://twitter.com/charithwiki>
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Web  http://apache.org/~charith <http://www-scf.usc.edu/~cwickram/>
> <http://charith.wickramaarachchi.org/>
> Blog  http://charith.wickramaarachchi.org/
> <http://charithwiki.blogspot.com/>
> Twitter  @charithwiki <https://twitter.com/charithwiki>
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Web  http://apache.org/~charith <http://www-scf.usc.edu/~cwickram/>
> <http://charith.wickramaarachchi.org/>
> Blog  http://charith.wickramaarachchi.org/
> <http://charithwiki.blogspot.com/>
> Twitter  @charithwiki <https://twitter.com/charithwiki>
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Web  http://apache.org/~charith <http://www-scf.usc.edu/~cwickram/>
> <http://charith.wickramaarachchi.org/>
> Blog  http://charith.wickramaarachchi.org/
> <http://charithwiki.blogspot.com/>
> Twitter  @charithwiki <https://twitter.com/charithwiki>
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>



-- 
Charith Dhanushka Wickramaarachchi

Tel  +1 213 447 4253
Web  http://apache.org/~charith <http://www-scf.usc.edu/~cwickram/>
<http://charith.wickramaarachchi.org/>
Blog  http://charith.wickramaarachchi.org/
<http://charithwiki.blogspot.com/>
Twitter  @charithwiki <https://twitter.com/charithwiki>

This communication may contain privileged or other confidential information
and is intended exclusively for the addressee/s. If you are not the
intended recipient/s, or believe that you may have
received this communication in error, please reply to the sender indicating
that fact and delete the copy you received and in addition, you should not
print, copy, retransmit, disseminate, or otherwise use the information
contained in this communication. Internet communications cannot be
guaranteed to be timely, secure, error or virus-free. The sender does not
accept liability for any errors or omissions
package org.apache.giraph.io.formats;

import com.google.common.collect.Lists;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.MappingReader;
import org.apache.giraph.mapping.MappingEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;

/**
 * Created by charith on 10/1/14.
 */
public class LongDoubleFloatLongMappingInputFormat<I extends WritableComparable,
        V extends Writable, E extends Writable, B extends Writable>
        extends MappingInputFormat<LongWritable,DoubleWritable,FloatWritable,LongWritable> {


    protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();

    @Override
    public MappingReader<LongWritable,DoubleWritable,FloatWritable,LongWritable>
    createMappingReader(InputSplit split, TaskAttemptContext context) throws IOException {
        return new LongDoubleFloatLongMappingReader();
    }

    @Override
    public void checkInputSpecs(Configuration conf) {
        //none
    }

    @Override
    public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException {
        return textInputFormat.getVertexSplits(context);
    }



    public class LongDoubleFloatLongMappingReader extends MappingReader<LongWritable,DoubleWritable,FloatWritable,LongWritable> {


        /** Separator of the vertex and neighbors */
        private final Pattern neighborSeparator = Pattern.compile("[\t ]");
        /** Separator of a neighbor and its weight */
        private final Pattern weightSeparator = Pattern.compile("[:]");


        /** Internal line record reader */
        private RecordReader<LongWritable, Text> lineRecordReader;
        /** Context passed to initialize */
      //  private TaskAttemptContext context;

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {

       //     this.context = context;
            lineRecordReader = createLineRecordReader(inputSplit,context);
            lineRecordReader.initialize(inputSplit, context);

        }


        /**
         * Create the line record reader. Override this to use a different
         * underlying record reader (useful for testing).
         *
         * @param inputSplit
         *          the split to read
         * @param context
         *          the context passed to initialize
         * @return
         *         the record reader to be used
         * @throws IOException
         *           exception that can be thrown during creation
         * @throws InterruptedException
         *           exception that can be thrown during creation
         */
        protected RecordReader<LongWritable, Text>
        createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
                throws IOException, InterruptedException {
            return textInputFormat.createRecordReader(inputSplit, context);
        }

        @Override
        public boolean nextEntry() throws IOException, InterruptedException {
            return lineRecordReader.nextKeyValue();
        }

        @Override
        public MappingEntry<LongWritable, LongWritable> getCurrentEntry() throws IOException, InterruptedException {





            String[] tokens = neighborSeparator.split(lineRecordReader
                    .getCurrentValue().toString());
            List<Edge<LongWritable, FloatWritable>> edges =
                    Lists.newArrayListWithCapacity(tokens.length - 1);

            for (int n = 1; n < tokens.length; n++) {
                String[] parts = weightSeparator.split(tokens[n]);
                edges.add(EdgeFactory.create(
                        new LongWritable(Long.parseLong(parts[0])),
                        new FloatWritable(Float.parseFloat(parts[1]))));
            }

            String vertex[] = weightSeparator.split(tokens[0]);


            LongWritable vertexId = new LongWritable(Long.parseLong(vertex[0]));
            LongWritable target = new LongWritable(Long.parseLong(vertex[1]));

            MappingEntry<LongWritable,LongWritable> entry = new MappingEntry<>(vertexId,target);


            return entry;


        }

        @Override
        public void close() throws IOException {
            lineRecordReader.close();
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lineRecordReader.getProgress();
        }
    }


}

Reply via email to