[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298733#comment-15298733
 ] 

ASF GitHub Bot commented on APEXMALHAR-2082:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/270#discussion_r64455136
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.filter;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.expression.Expression;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +/**
    + * <b>FilterOperator</b>
    + * Filter Operator filter out tuples based on defined condition
    + *
    + * <b>Parameters</b>
    + * - condition: condition based on expression langugage
    + *
    + * <b>Input Port</b> takes POJOs as an input
    + *
    + * <b>Output Ports</b>
    + * - truePort emits POJOs meeting the given condition
    + * - falsePort emits POJOs not meeting the given condition
    + * - error port emits any error situation while evaluating expression
    + * 
    + */
    +@InterfaceStability.Evolving
    +public class FilterOperator extends BaseOperator implements 
Operator.ActivationListener
    +{
    +  private transient Class<?> inClazz = null;
    +  private transient Expression<Boolean> expr = null;
    +  private List<String> expressionFunctions = new LinkedList<>();
    +
    +  @AutoMetric
    +  private long trueTuples;
    +
    +  @AutoMetric
    +  private long falseTuples;
    +
    +  @AutoMetric
    +  private long errorTuples;
    +
    +  protected String condition;
    +
    +  public final transient DefaultOutputPort<Object> truePort = new 
DefaultOutputPort<Object>();
    +
    +  public final transient DefaultOutputPort<Object> falsePort = new 
DefaultOutputPort<Object>();
    +
    +  public final transient DefaultOutputPort<Object> error = new 
DefaultOutputPort<Object>();
    +
    +  public FilterOperator()
    +  {
    +    expressionFunctions.add("java.lang.Math.*");
    +    expressionFunctions.add("org.apache.commons.lang3.StringUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*");
    +  }
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public transient DefaultInputPort<Object> input = new 
DefaultInputPort<Object>()
    +  {
    +    public void setup(PortContext context)
    +    {
    +      inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object t)
    +    {
    +      processTuple(t);
    +    }
    +  };
    +
    +  @Override
    +  public void activate(Context context)
    +  {
    +    createExpression();
    +  }
    +
    +  @Override
    +  public void deactivate()
    +  {
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    errorTuples = trueTuples = falseTuples = 0;
    +  }
    +
    +  /**
    +   * createExpression: create an expression from condition of POJO fields
    +   * Override this function for custom field expressions
    +   */
    +  protected void createExpression()
    +  {
    +    logger.info("Creating an expression for condition {}", condition);
    +    expr = PojoUtils.createExpression(inClazz, condition, Boolean.class,
    +        expressionFunctions.toArray(new 
String[expressionFunctions.size()]));
    +  }
    +
    +  /**
    +   * evalExpression: Evaluate condition/expression
    +   * Override this function for custom condition evaluation
    +   */
    +  protected Boolean evalExpression(Object t)
    +  {
    +    return expr.execute(t);
    +  }
    +
    +  /**
    +   * handleFilter: emit POJO meeting condition on truePort
    +   * and if it did not meet condition then on falsePort
    +   */
    +  private void processTuple(Object t)
    +  {
    +    try {
    +      if (evalExpression(t)) {
    +        truePort.emit(t);
    +        trueTuples++;
    +      } else {
    +        falsePort.emit(t);
    +        falseTuples++;
    +      }
    +    } catch (Exception ex) {
    +      logger.error("Error in expression eval: {}", ex.getMessage());
    +      logger.debug("Exception: ", ex);
    +      error.emit(t);
    +      errorTuples++;
    +    }
    +  }
    +
    +  /**
    +   * Returns condition/expression with which Filtering is done
    +   *
    +   * @return Condition parameter of Filter Operator
    --- End diff --
    
    C in Condition should be small. Otherwise javadoc won't be able to identify 
it.


> Data Filter Operator 
> ---------------------
>
>                 Key: APEXMALHAR-2082
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2082
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Pradeep A. Dalvi
>
> Data Filter Operator which will allow apex users to filter (select/drop) 
> tuples based on the certain condition from incoming stream.
> Use case:
> -------------
> In many cases, not all tuples are of interest for the downstream operators. 
> In such cases, one may want select/filter out tuples to downstream. Also one 
> may want to process tuples which did not meet the condition/expression.
> Functionality:
> -----------------
> 1. Any tuple for which expression could not be evaluated shall be emitted on 
> error output port.
> 2. Filter operator shall receive POJO as input tuple and emit POJO on either 
> of remaining output ports i.e. truePort and falsePort. As the output ports' 
> name signify, when condition is met then the POJO shall be emitted on 
> truePort and if condition is not met then that POJO shall be emitted on 
> falsePort.
> 3. Operator needs condition/expression as a input param. This condition is 
> based on expression language support we already have in Malhar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to