Hi,
I have posted the question to stackoverflow, where I have also
clearified my problem a bit.
If you have a solution, please respond there (if its not too much of a
hassle):
http://stackoverflow.com/questions/6469171/computing-set-intersection-and-set-difference-of-the-records-of-two-files-with-ha
Best regards,
Claus
On 06/24/2011 12:44 PM, Claus Stadler wrote:
Hi,
My problem is as follows:
I have two input files, and I want to determine
a) The number of lines which only occur in file 1
b) The number of lines which only occur in file 2
c) The number of lines common to both (e.g. in regard to string equality)
Exaple:
File 1:
a
b
c
File 2:
a
d
Desired output for each case:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Basically my approach is as follows:
I wrote my own LineRecordReader, so that the mapper receives a pair
consisting of the line (text) and a byte indicating the source file
(either 0 or 1).
The mapper only returns the pair again so actually it does nothing.
However, the side effect is, that the combiner receives a
Map<Line, Iterable<SourceId>> (where SourceId is either 0 or 1).
Now, for each line I can get the set of sources it appears in.
Therefore, I could write a combiner that counts for each case (a, b,
c) the number of lines (Listing 1)
The combiner then outputs a 'summary' only on cleanup (is that safe?).
So this summary looks like:
in_a_distinct_count_total 7531
in_b_distinct_count_total 3190
out_common_distinct_count_total 901
In the reducer I then only sum up the values for these summaries.
However, the main problem is, that I need to treat both source files
as a single virtual file which yield records of the form
(line, sourceId) // sourceId either 0 or 1
And I am not sure how to achieve that.
So the quesion is whether I can avoid preprocessing and mergind the
files before hand, and do that on-the-fly with a something like a
virtually-merged-file reader and custom record reader.
Any code example is much appreciated.
Best regards,
Claus
Listing 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable>
{
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values,
Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new
LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new
LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new
LongWritable(countC));
}