[
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411567#comment-15411567
]
ASF GitHub Bot commented on APEXMALHAR-2100:
--------------------------------------------
Github user chaithu14 commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/330#discussion_r73844336
--- Diff:
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends
AbstractManagedStateInnerJoinOperator<Object,Object> implements
Operator.ActivationListener<Context>
+{
+ private static final transient Logger LOG =
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+ private transient long timeIncrement;
+ private transient FieldObjectMap[] inputFieldObjects =
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+ protected Class<?> outputClass;
+ private long time = System.currentTimeMillis();
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new
DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input1 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[0].inputClass =
context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,true);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(true);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input2 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[1].inputClass =
context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,false);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(false);
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ timeIncrement =
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ super.setup(context);
+ for (int i = 0; i < 2; i++) {
+ inputFieldObjects[i] = new FieldObjectMap();
+ }
+ }
+
+ /**
+ * Extract the time value from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to
stream1 or not.
+ * @return
+ */
+ @Override
+ public long extractTime(Object tuple, boolean isStream1Data)
+ {
+ return timeFields == null ? time : (long)(isStream1Data ?
inputFieldObjects[0].timeFieldGet.get(tuple) :
+ inputFieldObjects[1].timeFieldGet.get(tuple));
+ }
+
+ /**
+ * Create getters for the key and time fields and setters for the
include fields.
+ */
+ protected void generateSettersAndGetters()
+ {
+ for (int i = 0; i < 2; i++) {
+ Class inputClass = inputFieldObjects[i].inputClass;
+ try {
+ Class c =
ClassUtils.primitiveToWrapper(inputClass.getField(keyFields.get(i)).getType());
+ inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass,
keyFields.get(i), c);
+ if (timeFields != null && timeFields.size() != 0) {
+ Class tc =
ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType());
+ inputFieldObjects[i].timeFieldGet =
PojoUtils.createGetter(inputClass, timeFields.get(i), tc);
+ }
+ for (int j = 0; j < includeFields[i].length; j++) {
+ Class ic =
ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType());
+ Class oc =
ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType());
+
inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass,
includeFields[i][j], ic),
+ PojoUtils.createSetter(outputClass, includeFields[i][j],
oc));
+ }
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Extract the key value from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to
stream1 or not.
+ * @return
+ */
+ @Override
+ public Object extractKey(Object tuple, boolean isStream1Data)
+ {
+ return isStream1Data ? inputFieldObjects[0].keyGet.get(tuple) :
+ inputFieldObjects[1].keyGet.get(tuple);
+ }
+
+ /**
+ * Merge the given tuples
+ * @param tuple1 tuple belongs to stream1
+ * @param tuple2 tuple belongs to stream1
+ * @return
+ */
+ @Override
+ public Object mergeTuples(Object tuple1, Object tuple2)
+ {
+ Object o;
+ try {
+ o = outputClass.newInstance();
+ for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g:
inputFieldObjects[0].fieldMap.entrySet()) {
+ g.getValue().set(o, g.getKey().get(tuple1));
+ }
+ for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g:
inputFieldObjects[1].fieldMap.entrySet()) {
+ g.getValue().set(o, g.getKey().get(tuple2));
+ }
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return o;
+ }
+
+ /**
+ * Emit the given tuple through the outputPort
+ * @param tuple given tuple
+ */
+ @Override
+ public void emitTuple(Object tuple)
+ {
+ outputPort.emit(tuple);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ generateSettersAndGetters();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ time += timeIncrement;
+ }
+
+ /**
+ * Returns the streamcodec for the streams
+ * @param isStream1data Specifies whether the codec needs for stream1 or
stream2.
+ * @return the object of InnerJoinStreamCodec
+ */
+ public StreamCodec<Object> getInnerJoinStreamCodec(boolean isStream1data)
+ {
+ if (isStream1data) {
+ return new InnerJoinStreamCodec(getKeyFieldsStr().split(",")[0]);
+ }
+ return new InnerJoinStreamCodec(getKeyFieldsStr().split(",")[1]);
+ }
+
+ private class FieldObjectMap
+ {
+ public Class<?> inputClass;
+ public PojoUtils.Getter keyGet;
+ public PojoUtils.Getter timeFieldGet;
+ public Map<PojoUtils.Getter,PojoUtils.Setter> fieldMap;
--- End diff --
In MergeTuples(), it's difficult to figure out that the getter method
belongs to stream1 or stream2.
> Development of Inner Join Operator using Spillable Datastructures
> -----------------------------------------------------------------
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chaitanya
> Assignee: Chaitanya
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)