Hi, I'm trying to understand working of TimeBasedDedupOperator for my streaming application. I'm using the example shown in Malhar dedup example: https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java
I made few modifications to minimize the output. Properties: <property> <name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name> <value>id</value> </property> <property> <name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name> <value>eventTime.getTime()</value> </property> <property> <name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name> <value>10</value> </property> <property> <name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name> <value>60</value> </property> Below is Application code: public class Application implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { // Test Data Generator Operator RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator()); // Dedup Operator. Configuration through resources/META-INF/properties.xml TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator()); // Console output operator for unique tuples ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator()); // Streams dag.addStream("Generator to Dedup", gen.output, dedup.input); // Connect Dedup unique to Console dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input); // Set Attribute TUPLE_CLASS for supplying schema information to the port dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); } public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator { public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>(); private final transient Random r = new Random(); private int tuplesPerWindow = 100; private transient int count = 0; @Override public void beginWindow(long windowId) { count = 0; } @Override public void emitTuples() { if (count++ > tuplesPerWindow) { return; } TestEvent event = new TestEvent(); event.id = r.nextInt(2); long millis = System.currentTimeMillis(); event.millis = millis; event.setTimeNow(new Date(millis)); // event.eventTime = new Date( millis - (r.nextInt(60 * 1000))); event.eventTime = new Date(millis); output.emit(event); } } public static class TestEvent { private int id; private Date timeNow; private Date eventTime; private long millis; public TestEvent() { } public long getMillis() { return millis; } public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getEventTime() { return eventTime; } public void setTimeNow(Date timeNow) { this.timeNow = timeNow; } public Date getTimeNow() { return timeNow; } public void setEventTime(Date eventTime) { this.eventTime = eventTime; } @Override public String toString() { return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" + timeNow + "; eventTime=" + eventTime + "]"; } } } I executed this application using JUnit test using LocalMode. But, in the console output I see duplicate records. I'm trying to understand the reason behind the duplication message appearing in unique console: 1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] 13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] I see lot of duplicates in unique port. Did I set any configuration wrong? Any suggestions are appreciated. Thanks -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/