[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207845#comment-15207845
 ] 

ASF GitHub Bot commented on APEXMALHAR-2015:
--------------------------------------------

Github user pradeepdalvi commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/217#discussion_r57107850
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java ---
    @@ -0,0 +1,303 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.projection;
    +
    +import java.lang.reflect.Field;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +
    +import com.datatorrent.common.util.BaseOperator;
    +
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +/**
    + * <b>ProjectionOperator</b>
    + * Projection Operator projects defined set of fields from given 
selectFields/dropFields
    + *
    + * <b>Parameters</b>
    + * - selectFields: comma separated list of fields to be selected from 
input tuples
    + * - dropFields: comma separated list of fields to be dropped from input 
tuples
    + * selectFields and dropFields are optional and either of them shall be 
specified
    + * When both are not specified, all fields shall be projected to 
downstream operator
    + *
    + * <b>Input Port</b> takes POJOs as an input
    + *
    + * <b>Output Ports</b>
    + * - projected port emits POJOs with projected fields from input POJOs
    + * - remainder port, if connected, emits POJOs with remainder fields from 
input POJOs
    + * - error port emits input POJOs as is upon error situations
    + * 
    + * <b>Examples</b>
    + * For {a, b, c} type of input tuples
    + *  - when selectFields = "" and dropFields = "", projected port shall 
emit {a, b, c}
    + *  - when selectFields = "b", projected port shall emit {b} and remainder 
port shall emit {a, c}
    + *  - when dropFields = "b", projected port shall emit {a, c} and 
remainder port shall emit {b}
    + * 
    + */
    +
    +public class ProjectionOperator extends BaseOperator implements 
Operator.ActivationListener<Context>
    +{
    +  protected String selectFields;
    +  protected String dropFields;
    +  protected String condition;
    +
    +  static class TypeInfo
    +  {
    +    String name;
    +    Class type;
    +    PojoUtils.Setter setter;
    +    PojoUtils.Getter getter;
    +
    +    public TypeInfo(String name, Class<?> type)
    +    {
    +      this.name = name;
    +      this.type = type;
    +    }
    +
    +    public String toString()
    +    {
    +      String s = new String("'name': " + name + " 'type': " + type);
    +      return s;
    +    }
    +  }
    +
    +  protected transient List<TypeInfo> projectedFields = new ArrayList<>();
    +  protected transient List<TypeInfo> remainderFields = new ArrayList<>();
    +
    +  @AutoMetric
    +  protected long projectedTuples;
    +
    +  @AutoMetric
    +  protected long remainderTuples;
    +
    +  @AutoMetric
    +  protected long errorTuples;
    +
    +  protected Class<?> inClazz = null;
    +  protected Class<?> projectedClazz = null;
    +  protected Class<?> remainderClazz = null;
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public transient DefaultInputPort<Object> in = new 
DefaultInputPort<Object>()
    +  {
    +    public void setup(PortContext context)
    +    {
    +      inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object t)
    +    {
    +      handleProjection(t);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> projected = new 
DefaultOutputPort<Object>()
    +  {
    +    public void setup(PortContext context)
    +    {
    +      projectedClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> remainder = new 
DefaultOutputPort<Object>()
    +  {
    +    public void setup(PortContext context)
    +    {
    +      remainderClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> error = new 
DefaultOutputPort<Object>()
    +  {
    +    public void setup(PortContext context)
    +    {
    +      inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  /**
    +   * addProjectedField: Add field details (name, type, getter and setter) 
for field with given name
    +   * in projectedFields list
    +   */
    +  protected void addProjectedField(String s)
    +  {
    +    try {
    +      Field f = inClazz.getDeclaredField(s);
    +      TypeInfo t = new TypeInfo(f.getName(), 
ClassUtils.primitiveToWrapper(f.getType()));
    +      t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
    +      logger.debug("Creating setter {} {} {}", projectedClazz, t.name, 
t.type);
    +      t.setter = PojoUtils.createSetter(projectedClazz, t.name, t.type);
    +      projectedFields.add(t);
    +    } catch (NoSuchFieldException e) {
    +      throw new RuntimeException("Field " + s + " not found in class " + 
inClazz, e);
    +    }
    +  }
    +
    +  /**
    +   * addRemainderField: Add field details (name, type, getter and setter) 
for field with given name
    +   * in remainderFields list
    +   */
    +  protected void addRemainderField(String s)
    +  {
    +    try {
    +      Field f = inClazz.getDeclaredField(s);
    +      TypeInfo t = new TypeInfo(f.getName(), 
ClassUtils.primitiveToWrapper(f.getType()));
    +      t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
    +      t.setter = PojoUtils.createSetter(remainderClazz, t.name, t.type);
    +      remainderFields.add(t);
    +    } catch (NoSuchFieldException e) {
    +      throw new RuntimeException("Field " + s + " not found in class " + 
inClazz, e);
    +    }
    +  }
    +
    --- End diff --
    
    projectedFields and remainderFields are separate and extending class might 
be interested only in one of them. Hence populating these lists have been kept 
differently.


> Projection Operator
> -------------------
>
>                 Key: APEXMALHAR-2015
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2015
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Pradeep A Dalvi
>
> Projection Operator will allow apex users to project (select/drop) certain 
> fields from the incoming tuples. This operation might be done unconditionally 
> or based on certain condition.
> Use case:
> -------------
> Not all fields of tuples are of interest for the downstream operators. In 
> such cases, one may want project selective fields to downstream. Also one may 
> want to drop few fields, instead of selecting many.
> In certain scenarios, one may want to project certain fields based on given 
> condition or expression.
> Functionality:
> -----------------
> 1. Projection operator shall receive POJO as input tuple and emit 2 POJOs on 
> separate output ports i.e. selected and dropped. Selected output port shall 
> emit POJO with selected fields and dropped output shall emit POJO of dropped 
> fields.
> 2. Operator needs select or drop fields as input params. This shall be 
> specified using comma separated list of fields.
> 3. Operator shall emit POJO only on connected output ports. In another words, 
> if dropped output port is not connected, it shall not even try to generate 
> POJOs with dropped fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to