[ 
https://issues.apache.org/jira/browse/HAMA-878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13915436#comment-13915436
 ] 

Garuda Kang commented on HAMA-878:
----------------------------------

i made sample code
{code}
public void Sort(String[] pPath, String pPathResult) {
                Configuration conf = new Configuration();
                FileSystem fs;
                try {
                        fs = FileSystem.getLocal(conf);
                        Path seqFilePath = new Path(pPathResult);
                        
                        SequenceFile.Writer writer = 
SequenceFile.createWriter(fs, conf, seqFilePath, k.getClass(), v.getClass(), 
CompressionType.NONE);
                        Map<Integer, SequenceFile.Reader> readers = new 
HashMap<Integer, SequenceFile.Reader>();
                        
                        
                        for(int a=0;a<pPath.length;a++) {
                                SequenceFile.Sorter sorter = new 
SequenceFile.Sorter(fs, wc.getClass(), v.getClass(), conf);
                                sorter.setMemory(50*1024*1024);
                                sorter.setFactor(10);
                                Path inFile = new Path(pPath[a]);
                                Path outFile = new 
Path(pPath[a].concat(".sorted"));
                                if(fs.exists(outFile)) {
                                        fs.delete(outFile, true);
                                }
                                sorter.sort(inFile, outFile);
                                readers.put(a, new SequenceFile.Reader(fs, 
outFile, conf));
                        }
                        
                        WritableComparable KeyLow = 
(WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
                        Writable ValueLow = 
(Writable)ReflectionUtils.newInstance(v.getClass(), conf);
                        int IndexLow = 0;
                        
                        
                        // instead comparisonMap
                        Map<Integer,Map<WritableComparable,Writable>> ValsMap = 
new HashMap<Integer,Map<WritableComparable,Writable>>();
                        
                        // set default value (firt record from sorted files)
                        for(int a=0;a<readers.size();a++) {
                                WritableComparable KeyNew = 
(WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
                                Writable ValueNew = 
(Writable)ReflectionUtils.newInstance(v.getClass(), conf);
                                readers.get(a).next(KeyNew, ValueNew);
                                Map<WritableComparable,Writable> innerMap = new 
HashMap<WritableComparable,Writable>();
                                innerMap.put(KeyNew, ValueNew);
                                ValsMap.put(a, innerMap);
                                if(a == 0) {
                                        KeyLow = KeyNew;
                                        ValueLow = ValueNew;
                                }
                        }
                        
                        int cnt = 0;
                        while(readers.size() > 0) {
                                
for(Map.Entry<Integer,Map<WritableComparable,Writable>> e1 : 
ValsMap.entrySet()) {
                                        for(Map.Entry<WritableComparable, 
Writable> e2 : e1.getValue().entrySet()) {
                                                // compare low value & set low 
value 
                                                
if(e2.getKey().compareTo(KeyLow) <= 0) {
                                                        KeyLow = e2.getKey();
                                                        ValueLow = 
e2.getValue();
                                                        IndexLow = e1.getKey();
                                                }
                                        }
                                }
                                writer.append(KeyLow, ValueLow);
                                ValsMap.remove(IndexLow);
                                
                                WritableComparable KeyNew = 
(WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
                                Writable ValueNew = 
(Writable)ReflectionUtils.newInstance(v.getClass(), conf);
                                
                                if(readers.containsKey(IndexLow)) {
                                        // get New value
                                        if(readers.get(IndexLow).next(KeyNew, 
ValueNew)) {
                                                
Map<WritableComparable,Writable> innerMap = new 
HashMap<WritableComparable,Writable>();
                                                innerMap.put(KeyNew, ValueNew);
                                                ValsMap.put(IndexLow, innerMap);
                                                KeyLow = KeyNew;
                                                ValueLow = ValueNew;
                                        }else{
                                                readers.get(IndexLow).close();
                                                readers.remove(IndexLow);
                                        }
                                }
                        }
                        writer.close();
                        
                        // confirm values
                        /*
                        SequenceFile.Reader resultReader = new 
SequenceFile.Reader(fs, seqFilePath, conf);
                        WritableComparable KeyNew = 
(WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
                        Writable ValueNew = 
(Writable)ReflectionUtils.newInstance(v.getClass(), conf);
                        while(resultReader.next(KeyNew, ValueNew)){
                                System.out.println(String.format("result 
[%s]\t[%s]", KeyNew, ValueNew));
                        }
                        */
                        
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }
{code}

> PartitioningRunner.Sorter doesn't allows duplicate key
> ------------------------------------------------------
>
>                 Key: HAMA-878
>                 URL: https://issues.apache.org/jira/browse/HAMA-878
>             Project: Hama
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>            Reporter: Edward J. Yoon
>            Assignee: Edward J. Yoon
>            Priority: Critical
>             Fix For: 0.7.0
>
>
> My mistake. I used SortedMap to compare keys when merge sort. So, some 
> records contains duplicate key can be ignored.
>   public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> 
> comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, 
> KeyValuePair>>();



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to