[ 
https://issues.apache.org/jira/browse/DATAFU-11?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13875677#comment-13875677
 ] 

jian wang edited comment on DATAFU-11 at 1/19/14 2:48 PM:
----------------------------------------------------------

Share some of my investigation. 

Add some debug output in the Initial, Intermed and Final, find that the same 
Intermed and Final instance's exec() is called multiple times to process 
grouped output from mapper under different grouped keys.  So the reservoir 
variable is shared between different group output. 

Also the same to Initial, find that the same Initial instance's exec() is 
called to process mapper input, even if it is of different group key.

Tried Barbara's solution, it resolves the reservoir variable share problem of 
Intermed and Final instance. 

But I am not sure if we need to distinguish the Initial input contains samples 
under different keys. eg: if <a1, 1>, <a2, 1> appears in the input sample 
argument of one invocation of Initial's exec().

Test diff that adds debug output to ReservoirSample:

--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -176,10 +176,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     protected ScoredTuple.ScoreGenerator scoreGen;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Initial(){}
     
     public Initial(String numSamples)
     {
+     id =System.nanoTime();
+     System.out.println("Create initial instance, numSamples: " + numSamples + 
", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -206,7 +210,11 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
       
       ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
       
+      
       DataBag samples = (DataBag) input.get(0);
+      
+      int preConsiderReservoirSize = getReservoir().size(); 
+      
       if (samples == null)
       {
         // do nothing
@@ -217,7 +225,8 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
           // add the score on to the intermediate tuple
           output.add(new ScoredTuple(scoreGen.generateScore(sample), 
sample).getIntermediateTuple(tupleFactory));
         }
      } else {            
         for (Tuple sample : samples) {
           getReservoir().consider(new 
ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
@@ -227,7 +236,11 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
           output.add(scoredTuple.getIntermediateTuple(tupleFactory));
         }
       }
-
+      System.out.println("finish exec of Initial with id: " + id + 
+              ", input: " + samples + 
+              ", output: " + tupleFactory.newTuple(output) +
+              ", preConsider reservoir size:" + preConsiderReservoirSize +
+              ", output reservoir size: " + getReservoir().size());
       return tupleFactory.newTuple(output);
     }
     
@@ -239,10 +252,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Intermediate(){}
     
     public Intermediate(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create intermed, numSamples: " + numSamples + ", id: 
" + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -257,21 +274,30 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public Tuple exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      //Reservoir r = new Reservoir(this.numSamples); //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = getReservoir().size();
+      
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        
         
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+         r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();
-      for (ScoredTuple scoredTuple : getReservoir()) {
+     for (ScoredTuple scoredTuple : r) {
         // add the score on to the intermediate tuple
         output.add(scoredTuple.getIntermediateTuple(tupleFactory));
       }
-
+      System.out.println("enter exec of Intermed with id: " + id +
+              ", input: " + bagOfSamples + 
+              ", output: " + tupleFactory.newTuple(output) + 
+              ", pre consider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
       return tupleFactory.newTuple(output);
     }
     
@@ -283,10 +309,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Final(){}
     
     public Final(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create final, numSamples: " + numSamples + ", id: " + 
id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -301,20 +331,32 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public DataBag exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      
+      //Reservoir r = new Reservoir(this.numSamples);  //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = r.size();
+      
       for (Tuple innerTuple : bagOfSamples) {
-        DataBag samples = (DataBag) innerTuple.get(0);        
-        
+        DataBag samples = (DataBag) innerTuple.get(0);   
+                
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+          r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();  
-      for (ScoredTuple scoredTuple : getReservoir()) {
+      for (ScoredTuple scoredTuple : r) {
         // output the original tuple
         output.add(scoredTuple.getTuple());
       }
+      
+      System.out.println("enter exec of Final with id: " + id +
+              ", input: " + bagOfSamples +
+              ", output: " + output + 
+              ", preConsider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
 
       return output;
     }    

Test pig:

register datafu-1.2.1-SNAPSHOT.jar;

DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: 
chararray);
grouped = GROUP data BY key;
sample2 = FOREACH grouped GENERATE ReservoirSample(data);
dump sample2;

all_grouped = GROUP data ALL;
sample3 = FOREACH all_grouped GENERATE ReservoirSample(data);
dump sample3;

The debug output:

Create initial instance, numSamples: 2, id: 1390057931816676000
Create intermed, numSamples: 2, id: 1390057931821523000
Create final, numSamples: 2, id: 1390057931822896000

Create initial instance, numSamples: 2, id: 1390057932290560000
finish exec of Initial with id: 1390057932290560000, input: {(a1,5)}, output: 
({(0.881896844845867,(a1,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,6)}, output: 
({(0.7843315204002389,(a1,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,7)}, output: 
({(0.8355896762512851,(a1,7))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,5)}, output: 
({(0.34698641636909844,(a2,5))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,6)}, output: 
({(0.8827556451749948,(a2,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,7)}, output: 
({(0.5957191900640255,(a2,7))}), preConsider reservoir size:0, output reservoir 
size: 0

Create intermed, numSamples: 2, id: 1390057932320244000
enter exec of Intermed with id: 1390057932320244000, input: 
{({(0.881896844845867,(a1,5))}),({(0.7843315204002389,(a1,6))}),({(0.8355896762512851,(a1,7))})},
 output: ({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))}), pre 
consider reservoir size: 0, output reservoir size: 2
enter exec of Intermed with id: 1390057932320244000, input: 
{({(0.34698641636909844,(a2,5))}),({(0.8827556451749948,(a2,6))}),({(0.5957191900640255,(a2,7))})},
 output: ({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))}), pre 
consider reservoir size: 2, output reservoir size: 2

Create final, numSamples: 2, id: 1390057935230757000
Create final, numSamples: 2, id: 1390057935235201000
enter exec of Final with id: 1390057935230757000, input: 
{({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))})}, output: 
{(a1,7),(a1,5)}, preConsider reservoir size: 0, output reservoir size: 2
enter exec of Final with id: 1390057935230757000, input: 
{({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))})}, output: 
{(a1,5),(a2,6)}, preConsider reservoir size: 2, output reservoir size: 2

({(a1,7),(a1,5)})
({(a1,5),(a2,6)})

Create initial instance, numSamples: 2, id: 1390057938566442000
Create intermed, numSamples: 2, id: 1390057938566921000
Create final, numSamples: 2, id: 1390057938567202000
Create initial instance, numSamples: 2, id: 1390057938763658000

finish exec of Initial with id: 1390057938763658000, input: {(a1,5)}, output: 
({(0.3309669492690237,(a1,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,6)}, output: 
({(0.9869182911655885,(a1,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,7)}, output: 
({(0.02294962522144972,(a1,7))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,5)}, output: 
({(0.8470840606471902,(a2,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,6)}, output: 
({(0.00556133574224793,(a2,6))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,7)}, output: 
({(0.8134381033586324,(a2,7))}), preConsider reservoir size:0, output reservoir 
size: 0

Create intermed, numSamples: 2, id: 1390057938777755000
enter exec of Intermed with id: 1390057938777755000, input: 
{({(0.3309669492690237,(a1,5))}),({(0.9869182911655885,(a1,6))}),({(0.02294962522144972,(a1,7))}),({(0.8470840606471902,(a2,5))}),({(0.00556133574224793,(a2,6))}),({(0.8134381033586324,(a2,7))})},
 output: ({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))}), pre 
consider reservoir size: 0, output reservoir size: 2

Create final, numSamples: 2, id: 1390057941698739000
Create final, numSamples: 2, id: 1390057941701617000

enter exec of Final with id: 1390057941698739000, input: 
{({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))})}, output: 
{(a2,5),(a1,6)}, preConsider reservoir size: 0, output reservoir size: 2
({(a2,5),(a1,6)})
~                                               


was (Author: king821221):
Share some of my investigation. 

Add some debug output in the Initial, Intermed and Final, find that the same 
Intermed and Final instance's exec() is called multiple times to process 
grouped output from mapper under different grouped keys.  So the reservoir 
variable is shared between different group output. 

Also the same to Initial, find that the same Initial instance's exec() is 
called to process mapper input, even if it is of different group key.

Tried Barbara's solution, it resolves the reservoir variable share problem of 
Intermed and Final instance. But I am not sure if we need to distinguish the 
Initial input contains samples under different keys. eg: if <a1, 1>, <a2, 1> 
appears in the input sample argument of one invocation of Initial's exec().

Test diff that adds debug output to ReservoirSample:

--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -176,10 +176,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     protected ScoredTuple.ScoreGenerator scoreGen;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Initial(){}
     
     public Initial(String numSamples)
     {
+     id =System.nanoTime();
+     System.out.println("Create initial instance, numSamples: " + numSamples + 
", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -206,7 +210,11 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
       
       ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
       
+      
       DataBag samples = (DataBag) input.get(0);
+      
+      int preConsiderReservoirSize = getReservoir().size(); 
+      
       if (samples == null)
       {
         // do nothing
@@ -217,7 +225,8 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
           // add the score on to the intermediate tuple
           output.add(new ScoredTuple(scoreGen.generateScore(sample), 
sample).getIntermediateTuple(tupleFactory));
         }
      } else {            
         for (Tuple sample : samples) {
           getReservoir().consider(new 
ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
@@ -227,7 +236,11 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
           output.add(scoredTuple.getIntermediateTuple(tupleFactory));
         }
       }
-
+      System.out.println("finish exec of Initial with id: " + id + 
+              ", input: " + samples + 
+              ", output: " + tupleFactory.newTuple(output) +
+              ", preConsider reservoir size:" + preConsiderReservoirSize +
+              ", output reservoir size: " + getReservoir().size());
       return tupleFactory.newTuple(output);
     }
     
@@ -239,10 +252,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Intermediate(){}
     
     public Intermediate(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create intermed, numSamples: " + numSamples + ", id: 
" + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -257,21 +274,30 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public Tuple exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      //Reservoir r = new Reservoir(this.numSamples); //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = getReservoir().size();
+      
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        
         
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+         r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();
-      for (ScoredTuple scoredTuple : getReservoir()) {
+     for (ScoredTuple scoredTuple : r) {
         // add the score on to the intermediate tuple
         output.add(scoredTuple.getIntermediateTuple(tupleFactory));
       }
-
+      System.out.println("enter exec of Intermed with id: " + id +
+              ", input: " + bagOfSamples + 
+              ", output: " + tupleFactory.newTuple(output) + 
+              ", pre consider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
       return tupleFactory.newTuple(output);
     }
     
@@ -283,10 +309,14 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Final(){}
     
     public Final(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create final, numSamples: " + numSamples + ", id: " + 
id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -301,20 +331,32 @@ public class ReservoirSample extends 
AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public DataBag exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      
+      //Reservoir r = new Reservoir(this.numSamples);  //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = r.size();
+      
       for (Tuple innerTuple : bagOfSamples) {
-        DataBag samples = (DataBag) innerTuple.get(0);        
-        
+        DataBag samples = (DataBag) innerTuple.get(0);   
+                
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+          r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();  
-      for (ScoredTuple scoredTuple : getReservoir()) {
+      for (ScoredTuple scoredTuple : r) {
         // output the original tuple
         output.add(scoredTuple.getTuple());
       }
+      
+      System.out.println("enter exec of Final with id: " + id +
+              ", input: " + bagOfSamples +
+              ", output: " + output + 
+              ", preConsider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
 
       return output;
     }    

Test pig:

register datafu-1.2.1-SNAPSHOT.jar;

DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: 
chararray);
grouped = GROUP data BY key;
sample2 = FOREACH grouped GENERATE ReservoirSample(data);
dump sample2;

all_grouped = GROUP data ALL;
sample3 = FOREACH all_grouped GENERATE ReservoirSample(data);
dump sample3;

The debug output:

Create initial instance, numSamples: 2, id: 1390057931816676000
Create intermed, numSamples: 2, id: 1390057931821523000
Create final, numSamples: 2, id: 1390057931822896000

Create initial instance, numSamples: 2, id: 1390057932290560000
finish exec of Initial with id: 1390057932290560000, input: {(a1,5)}, output: 
({(0.881896844845867,(a1,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,6)}, output: 
({(0.7843315204002389,(a1,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,7)}, output: 
({(0.8355896762512851,(a1,7))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,5)}, output: 
({(0.34698641636909844,(a2,5))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,6)}, output: 
({(0.8827556451749948,(a2,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,7)}, output: 
({(0.5957191900640255,(a2,7))}), preConsider reservoir size:0, output reservoir 
size: 0

Create intermed, numSamples: 2, id: 1390057932320244000
enter exec of Intermed with id: 1390057932320244000, input: 
{({(0.881896844845867,(a1,5))}),({(0.7843315204002389,(a1,6))}),({(0.8355896762512851,(a1,7))})},
 output: ({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))}), pre 
consider reservoir size: 0, output reservoir size: 2
enter exec of Intermed with id: 1390057932320244000, input: 
{({(0.34698641636909844,(a2,5))}),({(0.8827556451749948,(a2,6))}),({(0.5957191900640255,(a2,7))})},
 output: ({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))}), pre 
consider reservoir size: 2, output reservoir size: 2

Create final, numSamples: 2, id: 1390057935230757000
Create final, numSamples: 2, id: 1390057935235201000
enter exec of Final with id: 1390057935230757000, input: 
{({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))})}, output: 
{(a1,7),(a1,5)}, preConsider reservoir size: 0, output reservoir size: 2
enter exec of Final with id: 1390057935230757000, input: 
{({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))})}, output: 
{(a1,5),(a2,6)}, preConsider reservoir size: 2, output reservoir size: 2

({(a1,7),(a1,5)})
({(a1,5),(a2,6)})

Create initial instance, numSamples: 2, id: 1390057938566442000
Create intermed, numSamples: 2, id: 1390057938566921000
Create final, numSamples: 2, id: 1390057938567202000
Create initial instance, numSamples: 2, id: 1390057938763658000

finish exec of Initial with id: 1390057938763658000, input: {(a1,5)}, output: 
({(0.3309669492690237,(a1,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,6)}, output: 
({(0.9869182911655885,(a1,6))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,7)}, output: 
({(0.02294962522144972,(a1,7))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,5)}, output: 
({(0.8470840606471902,(a2,5))}), preConsider reservoir size:0, output reservoir 
size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,6)}, output: 
({(0.00556133574224793,(a2,6))}), preConsider reservoir size:0, output 
reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,7)}, output: 
({(0.8134381033586324,(a2,7))}), preConsider reservoir size:0, output reservoir 
size: 0

Create intermed, numSamples: 2, id: 1390057938777755000
enter exec of Intermed with id: 1390057938777755000, input: 
{({(0.3309669492690237,(a1,5))}),({(0.9869182911655885,(a1,6))}),({(0.02294962522144972,(a1,7))}),({(0.8470840606471902,(a2,5))}),({(0.00556133574224793,(a2,6))}),({(0.8134381033586324,(a2,7))})},
 output: ({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))}), pre 
consider reservoir size: 0, output reservoir size: 2

Create final, numSamples: 2, id: 1390057941698739000
Create final, numSamples: 2, id: 1390057941701617000

enter exec of Final with id: 1390057941698739000, input: 
{({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))})}, output: 
{(a2,5),(a1,6)}, preConsider reservoir size: 0, output reservoir size: 2
({(a2,5),(a1,6)})
~                                               

> ReservoirSample does not behave as expected when grouping by a key other than 
> ALL
> ---------------------------------------------------------------------------------
>
>                 Key: DATAFU-11
>                 URL: https://issues.apache.org/jira/browse/DATAFU-11
>             Project: DataFu
>          Issue Type: Bug
>            Reporter: Will Vaughan
>
> Reported by Barbara Mucha ([Issue #92 on 
> GitHub|https://github.com/linkedin/datafu/issues/92]):
> ReservoirSample does not behave as expected when grouping by a key other than 
> ALL.
> It appears like the sample is done on the full input instead of the group 
> input.
> Given input:
> {noformat}
> a1,5
> a1,6
> a1,7
> a2,5
> a2,6
> a2,7
> {noformat}
> with the following program
> {noformat}
> DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
> data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: 
> chararray);
> grouped = GROUP data BY key;
> sample2 = FOREACH grouped GENERATE ReservoirSample(data);
> {noformat}
> the expected output should be similar to
> {noformat}
> (a1, {(a1,5),(a1,7)}
> (a2, {(a2,5),(a2,7)}
> {noformat}
> However, actual output may show up as
> {noformat}
> (a1, {(a1,5),(a1,7)}
> (a2, {(a1,5),(a1,7)}
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to