[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405766#comment-15405766 ]
ASF GitHub Bot commented on APEXMALHAR-2100: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73322874 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * <p> + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * <b>Properties:</b><br> + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4<br> + * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br> + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br> + * <b>expiryTime</b>: Expiry time for stored tuples<br> + * <b>isStream1KeyPrimary</b>: : Specifies whether the stream1 key is primary or not<br> + * <b>isStream2KeyPrimary</b>: : Specifies whether the stream2 key is primary or not<br> + * + * <b> Example: </b> <br> + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes </b> <br> + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List<String> keyFields; + protected transient List<String> timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { + Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + K key = extractKey(tuple,isStream1Data); + if (!store.put(key, tuple)) { + return; + } + Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; + joinStream(tuple,isStream1Data, valuestore.get(key)); + } + + /** + * Merge the given tuple and list of values. + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @param value list of tuples + */ + protected void joinStream(T tuple, boolean isStream1Data, List<T> value) + { + // Join the input tuple with the joined tuples + if (value != null) { --- End diff -- Agreed. I suggest we make this as assumption for this version of Inner Join. Later on when we have Outer join, we can make inner join as a special case of outer join which is when we can take care of this. Lets create a Jira for addition of this functionality in inner join when this PR gets merged. Let me know if you think otherwise. > Development of Inner Join Operator using Spillable Datastructures > ----------------------------------------------------------------- > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Chaitanya > Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)