Hi, 

I'm trying to understand working of TimeBasedDedupOperator for my streaming
application. I'm using the example shown in Malhar dedup example:
https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java

I made few modifications to minimize the output. 
Properties: 
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name>
    <value>id</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name>
    <value>eventTime.getTime()</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
    <value>10</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name>
    <value>60</value>
  </property>

Below is Application code: 

public class Application implements StreamingApplication 
{ 

  @Override 
  public void populateDAG(DAG dag, Configuration conf) 
  { 
    // Test Data Generator Operator 
    RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new
RandomDataGeneratorOperator()); 

    // Dedup Operator. Configuration through
resources/META-INF/properties.xml 
    TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
TimeBasedDedupOperator()); 

    // Console output operator for unique tuples 
    ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
new ConsoleOutputOperator()); 

    // Streams 
    dag.addStream("Generator to Dedup", gen.output, dedup.input); 

    // Connect Dedup unique to Console 
    dag.addStream("Dedup Unique to Console", dedup.unique,
consoleUnique.input); 
    // Set Attribute TUPLE_CLASS for supplying schema information to the
port 
    dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS,
TestEvent.class); 
} 

  public static class RandomDataGeneratorOperator extends BaseOperator
implements InputOperator 
  { 

    public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>(); 
    private final transient Random r = new Random(); 
    private int tuplesPerWindow = 100; 
    private transient int count = 0; 

    @Override 
    public void beginWindow(long windowId) 
    { 
      count = 0; 
    } 

    @Override 
    public void emitTuples() 
    { 
      if (count++ > tuplesPerWindow) { 
        return; 
      } 
      TestEvent event = new TestEvent(); 
      event.id = r.nextInt(2); 
      long millis = System.currentTimeMillis(); 
      event.millis = millis; 
      event.setTimeNow(new Date(millis)); 
//      event.eventTime = new Date( millis - (r.nextInt(60 * 1000))); 
      event.eventTime = new Date(millis); 
      output.emit(event); 
    } 
  } 

  public static class TestEvent 
  { 
    private int id; 
    private Date timeNow; 
    private Date eventTime; 
    private long millis; 

    public TestEvent() 
    { 
    } 
    public long getMillis() { return millis; } 

    public int getId() 
    { 
      return id; 
    } 

    public void setId(int id) 
    { 
      this.id = id; 
    } 

    public Date getEventTime() 
    { 
      return eventTime; 
    } 

    public void setTimeNow(Date timeNow) { 
      this.timeNow = timeNow; 
    } 

    public Date getTimeNow() { 
      return timeNow; 
    } 

    public void setEventTime(Date eventTime) 
    { 
      this.eventTime = eventTime; 
    } 

    @Override 
    public String toString() 
    { 
      return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" +
timeNow + "; eventTime=" + eventTime + "]"; 
    } 

  } 

} 

I executed this application using JUnit test using LocalMode. But, in the
console output I see duplicate records. I'm trying to understand the reason
behind the duplication message appearing in unique console: 
1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 
11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 
12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] 
13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] 

I see lot of duplicates in unique port. Did I set any configuration wrong? 

Any suggestions are appreciated. 

Thanks



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply via email to