[ https://issues.apache.org/jira/browse/APEXMALHAR-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298733#comment-15298733 ]
ASF GitHub Bot commented on APEXMALHAR-2082: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/270#discussion_r64455136 --- Diff: library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java --- @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.filter; + +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.expression.Expression; +import com.datatorrent.lib.util.PojoUtils; + +/** + * <b>FilterOperator</b> + * Filter Operator filter out tuples based on defined condition + * + * <b>Parameters</b> + * - condition: condition based on expression langugage + * + * <b>Input Port</b> takes POJOs as an input + * + * <b>Output Ports</b> + * - truePort emits POJOs meeting the given condition + * - falsePort emits POJOs not meeting the given condition + * - error port emits any error situation while evaluating expression + * + */ +@InterfaceStability.Evolving +public class FilterOperator extends BaseOperator implements Operator.ActivationListener +{ + private transient Class<?> inClazz = null; + private transient Expression<Boolean> expr = null; + private List<String> expressionFunctions = new LinkedList<>(); + + @AutoMetric + private long trueTuples; + + @AutoMetric + private long falseTuples; + + @AutoMetric + private long errorTuples; + + protected String condition; + + public final transient DefaultOutputPort<Object> truePort = new DefaultOutputPort<Object>(); + + public final transient DefaultOutputPort<Object> falsePort = new DefaultOutputPort<Object>(); + + public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>(); + + public FilterOperator() + { + expressionFunctions.add("java.lang.Math.*"); + expressionFunctions.add("org.apache.commons.lang3.StringUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*"); + } + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + public void setup(PortContext context) + { + inClazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object t) + { + processTuple(t); + } + }; + + @Override + public void activate(Context context) + { + createExpression(); + } + + @Override + public void deactivate() + { + } + + @Override + public void beginWindow(long windowId) + { + errorTuples = trueTuples = falseTuples = 0; + } + + /** + * createExpression: create an expression from condition of POJO fields + * Override this function for custom field expressions + */ + protected void createExpression() + { + logger.info("Creating an expression for condition {}", condition); + expr = PojoUtils.createExpression(inClazz, condition, Boolean.class, + expressionFunctions.toArray(new String[expressionFunctions.size()])); + } + + /** + * evalExpression: Evaluate condition/expression + * Override this function for custom condition evaluation + */ + protected Boolean evalExpression(Object t) + { + return expr.execute(t); + } + + /** + * handleFilter: emit POJO meeting condition on truePort + * and if it did not meet condition then on falsePort + */ + private void processTuple(Object t) + { + try { + if (evalExpression(t)) { + truePort.emit(t); + trueTuples++; + } else { + falsePort.emit(t); + falseTuples++; + } + } catch (Exception ex) { + logger.error("Error in expression eval: {}", ex.getMessage()); + logger.debug("Exception: ", ex); + error.emit(t); + errorTuples++; + } + } + + /** + * Returns condition/expression with which Filtering is done + * + * @return Condition parameter of Filter Operator --- End diff -- C in Condition should be small. Otherwise javadoc won't be able to identify it. > Data Filter Operator > --------------------- > > Key: APEXMALHAR-2082 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2082 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: Pradeep A. Dalvi > > Data Filter Operator which will allow apex users to filter (select/drop) > tuples based on the certain condition from incoming stream. > Use case: > ------------- > In many cases, not all tuples are of interest for the downstream operators. > In such cases, one may want select/filter out tuples to downstream. Also one > may want to process tuples which did not meet the condition/expression. > Functionality: > ----------------- > 1. Any tuple for which expression could not be evaluated shall be emitted on > error output port. > 2. Filter operator shall receive POJO as input tuple and emit POJO on either > of remaining output ports i.e. truePort and falsePort. As the output ports' > name signify, when condition is met then the POJO shall be emitted on > truePort and if condition is not met then that POJO shall be emitted on > falsePort. > 3. Operator needs condition/expression as a input param. This condition is > based on expression language support we already have in Malhar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)