[
https://issues.apache.org/jira/browse/APEXMALHAR-2015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204027#comment-15204027
]
ASF GitHub Bot commented on APEXMALHAR-2015:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/217#discussion_r56805446
--- Diff:
library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java ---
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.lang.reflect.Field;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <b>ProjectionOperator</b>
+ * Projection Operator projects defined set of fields from given
selectFields/dropFields
+ *
+ * <b>Parameters</b>
+ * - selectFields: comma separated list of fields to be selected from
input tuples
+ * - dropFields: comma separated list of fields to be dropped from input
tuples
+ * selectFields and dropFields are optional and either of them shall be
specified
+ * When both are not specified, all fields shall be projected to
downstream operator
+ *
+ * <b>Input Port</b> takes POJOs as an input
+ *
+ * <b>Output Ports</b>
+ * - projected port emits POJOs with projected fields from input POJOs
+ * - remainder port, if connected, emits POJOs with remainder fields from
input POJOs
+ * - error port emits input POJOs as is upon error situations
+ *
+ * <b>Examples</b>
+ * For {a, b, c} type of input tuples
+ * - when selectFields = "" and dropFields = "", projected port shall
emit {a, b, c}
+ * - when selectFields = "b", projected port shall emit {b} and remainder
port shall emit {a, c}
+ * - when dropFields = "b", projected port shall emit {a, c} and
remainder port shall emit {b}
+ *
+ */
+
+public class ProjectionOperator extends BaseOperator implements
Operator.ActivationListener<Context>
+{
+ protected String selectFields;
+ protected String dropFields;
+ protected String condition;
+
+ static class TypeInfo
+ {
+ String name;
+ Class type;
+ PojoUtils.Setter setter;
+ PojoUtils.Getter getter;
+
+ public TypeInfo(String name, Class<?> type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String toString()
+ {
+ String s = new String("'name': " + name + " 'type': " + type);
+ return s;
+ }
+ }
+
+ protected transient List<TypeInfo> projectedFields = new ArrayList<>();
+ protected transient List<TypeInfo> remainderFields = new ArrayList<>();
+
+ @AutoMetric
+ protected long projectedTuples;
+
+ @AutoMetric
+ protected long remainderTuples;
+
+ @AutoMetric
+ protected long errorTuples;
+
+ protected Class<?> inClazz = null;
+ protected Class<?> projectedClazz = null;
+ protected Class<?> remainderClazz = null;
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> in = new
DefaultInputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object t)
+ {
+ handleProjection(t);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> projected = new
DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ projectedClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> remainder = new
DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ remainderClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> error = new
DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ /**
+ * addProjectedField: Add field details (name, type, getter and setter)
for field with given name
+ * in projectedFields list
+ */
+ protected void addProjectedField(String s)
+ {
+ try {
+ Field f = inClazz.getDeclaredField(s);
+ TypeInfo t = new TypeInfo(f.getName(),
ClassUtils.primitiveToWrapper(f.getType()));
+ t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
+ logger.debug("Creating setter {} {} {}", projectedClazz, t.name,
t.type);
+ t.setter = PojoUtils.createSetter(projectedClazz, t.name, t.type);
+ projectedFields.add(t);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + s + " not found in class " +
inClazz, e);
+ }
+ }
+
+ /**
+ * addRemainderField: Add field details (name, type, getter and setter)
for field with given name
+ * in remainderFields list
+ */
+ protected void addRemainderField(String s)
--- End diff --
Suggesting to make the method private unless there is a immediate need
where these methods can be overridden by subclass.
> Projection Operator
> -------------------
>
> Key: APEXMALHAR-2015
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2015
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Pradeep A Dalvi
>
> Projection Operator will allow apex users to project (select/drop) certain
> fields from the incoming tuples. This operation might be done unconditionally
> or based on certain condition.
> Use case:
> -------------
> Not all fields of tuples are of interest for the downstream operators. In
> such cases, one may want project selective fields to downstream. Also one may
> want to drop few fields, instead of selecting many.
> In certain scenarios, one may want to project certain fields based on given
> condition or expression.
> Functionality:
> -----------------
> 1. Projection operator shall receive POJO as input tuple and emit 2 POJOs on
> separate output ports i.e. selected and dropped. Selected output port shall
> emit POJO with selected fields and dropped output shall emit POJO of dropped
> fields.
> 2. Operator needs select or drop fields as input params. This shall be
> specified using comma separated list of fields.
> 3. Operator shall emit POJO only on connected output ports. In another words,
> if dropped output port is not connected, it shall not even try to generate
> POJOs with dropped fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)