[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256031#comment-15256031
 ] 

ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------

Github user sandeepdeshmukh commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60873987
  
    --- Diff: 
contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class POJOEnricherTest extends JDBCLoaderTest
    +{
    +  @Test
    +  public void includeSelectedKeys()
    +  {
    +    POJOEnricher oper = new POJOEnricher();
    +    oper.setStore(testMeta.dbloader);
    +    oper.setLookupFields(Arrays.asList("ID"));
    +    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS"));
    +    oper.outputClass = EmployeeOrder.class;
    +    oper.inputClass = Order.class;
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    oper.activate(null);
    +
    +    oper.beginWindow(1);
    +    Order tuple = new Order(3, 4, 700);
    +    oper.input.process(tuple);
    +    oper.endWindow();
    +
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 
1, sink.collectedTuples.size());
    +    Assert.assertEquals("Ouput Tuple: ",
    +        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, 
ADDRESS='Rich-Mond', SALARY=0.0}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void includeAllKeys()
    +  {
    +    POJOEnricher oper = new POJOEnricher();
    +    oper.setStore(testMeta.dbloader);
    +    oper.setLookupFields(Arrays.asList("ID"));
    +    oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", 
"SALARY"));
    +    oper.outputClass = EmployeeOrder.class;
    +    oper.inputClass = Order.class;
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    oper.activate(null);
    +
    +    oper.beginWindow(1);
    +    Order tuple = new Order(3, 4, 700);
    +    oper.input.process(tuple);
    +    oper.endWindow();
    +
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 
1, sink.collectedTuples.size());
    +    Assert.assertEquals("Ouput Tuple: ",
    +        "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, 
ADDRESS='Rich-Mond', SALARY=65000.0}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void testApplication() throws Exception
    +  {
    +    EnrichApplication enrichApplication = new EnrichApplication();
    +    enrichApplication.setLoader(testMeta.dbloader);
    +
    +    LocalMode lma = LocalMode.newInstance();
    +    Configuration conf = new Configuration(false);
    +    lma.prepareDAG(enrichApplication, conf);
    +    LocalMode.Controller lc = lma.getController();
    +    lc.run(10000);// runs for 10 seconds and quits
    +  }
    +
    +  public static class EnrichApplication implements StreamingApplication
    +  {
    +    BackendLoader loader;
    +
    +    @Override
    +    public void populateDAG(DAG dag, Configuration configuration)
    +    {
    +      RandomPOJOGenerator input = dag.addOperator("Input", 
RandomPOJOGenerator.class);
    +      POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
    +      EnrichVerifier verify = dag.addOperator("Verify", 
EnrichVerifier.class);
    +
    +      enrich.setStore(loader);
    +      ArrayList<String> lookupFields = new ArrayList<>();
    +      lookupFields.add("ID");
    +      ArrayList<String> includeFields = new ArrayList<>();
    +      includeFields.add("NAME");
    +      includeFields.add("AGE");
    +      includeFields.add("ADDRESS");
    +      includeFields.add("SALARY");
    +      enrich.setLookupFields(lookupFields);
    +      enrich.setIncludeFields(includeFields);
    +
    +      
dag.getMeta(enrich).getMeta(enrich.input).getAttributes().put(Context.PortContext.TUPLE_CLASS,
 Order.class);
    +      dag.getMeta(enrich).getMeta(enrich.output).getAttributes()
    +          .put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class);
    +
    +      dag.addStream("S1", input.output, enrich.input);
    +      dag.addStream("S2", enrich.output, verify.input);
    +    }
    +
    +    public void setLoader(BackendLoader loader)
    +    {
    +      this.loader = loader;
    +    }
    +  }
    +
    +  public static class RandomPOJOGenerator implements InputOperator
    +  {
    +    public transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
    +    private int idx = 0;
    +    private boolean emit = true;
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +      if (!emit) {
    +        return;
    +      }
    +      idx += idx++ % 4;
    +      Order o = new Order(1, idx + 1, 100.00);
    +      output.emit(o);
    +      if (idx == 3) {
    +        emit = false;
    +      }
    +    }
    +
    +    @Override
    +    public void beginWindow(long l)
    +    {
    +      emit = true;
    +    }
    +
    +    @Override
    +    public void endWindow()
    +    {
    +
    +    }
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +    }
    +
    +    @Override
    +    public void teardown()
    +    {
    +    }
    +  }
    +
    +  public static class EnrichVerifier extends BaseOperator
    +  {
    +    private static final Logger logger = 
LoggerFactory.getLogger(EnrichVerifier.class);
    +
    +    private transient DefaultInputPort<Object> input = new 
DefaultInputPort<Object>()
    +    {
    +      @Override
    +      public void process(Object o)
    +      {
    +        Assert.assertTrue(o instanceof EmployeeOrder);
    +        EmployeeOrder order = (EmployeeOrder)o;
    +        int id = order.getID();
    +        Assert.assertTrue(id >= 1 && id <= 4);
    +        Assert.assertEquals(1, order.getOID());
    +        Assert.assertEquals(100.00, order.getAmount(), 0);
    +
    +        String[] names = {"Paul", "Allen", "Teddy", "Mark"};
    --- End diff --
    
    Use a variable that can be shared at both the places. 


> Adding Enrichment Operator to Malhar
> ------------------------------------
>
>                 Key: APEXMALHAR-2023
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>    Affects Versions: 3.3.1
>            Reporter: Chinmay Kolhatkar
>            Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to