[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405455#comment-15405455 ]
ASF GitHub Bot commented on APEXMALHAR-2100: -------------------------------------------- Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73289851 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.join.managed.ManagedSpillableComplexComponent; +import com.datatorrent.lib.join.managed.ManagedTimeStateMultiMap; +import com.datatorrent.netlet.util.Slice; + +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements + Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map<JoinEvent<K,T>, Future<Slice>> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + @Override + public void createStores() + { + stream1Store = new ManagedTimeStateImpl(); + stream2Store = new ManagedTimeStateImpl(); + stream1Store.setNumBuckets(noOfBuckets); + stream2Store.setNumBuckets(noOfBuckets); + if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + } + + if (getExpiryTime() != null) { + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + } + + component = new ManagedSpillableComplexComponent(); + stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, isStream1KeyPrimary()); + stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, isStream2KeyPrimary()); + } + + @Override + protected void processTuple(T tuple, boolean isStream1Data) + { + Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + K key = extractKey(tuple,isStream1Data); + long timeBucket = extractTime(tuple,isStream1Data); + ((ManagedTimeStateMultiMap)store).put(key, tuple,timeBucket); + joinStream(key,tuple,isStream1Data); + } + + @Override + protected void joinStream(K key, T tuple, boolean isStream1Data) + { + Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? stream2Data : stream1Data; + Future<Slice> future = ((ManagedTimeStateMultiMap)store).getAsync(key); + if (future.isDone()) { + try { + joinStream(key,tuple,isStream1Data, future.get()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } else { + waitingEvents.put(new JoinEvent<>(key,tuple,isStream1Data),future); + } + } + + private void joinStream(K key, T tuple, boolean isStream1Data, Slice valueSlice) + { + Spillable.SpillableByteArrayListMultimap<K, T> store = isStream1Data ? stream2Data : stream1Data; + List<T> value = null; + if (((ManagedTimeStateMultiMap)store).isKeyContainsMultiValue()) { + value = (List<T>)((ManagedTimeStateMultiMap)store).getStreamCodec().fromByteArray(valueSlice); + } else { + if (valueSlice != null && valueSlice.length != 0 && valueSlice.buffer != null) { + value = new ArrayList<>(); + value.add((T)((ManagedTimeStateMultiMap)store).getStreamCodec().fromByteArray(valueSlice)); + } + } + // Join the input tuple with the joined tuples + if (value != null) { + for (T joinedValue : value) { + T result = isStream1Data ? joinTuples(Arrays.asList(tuple, joinedValue)) : + joinTuples(Arrays.asList(joinedValue, tuple)); + if (result != null) { + emitTuple(result); + } + } + } + } + + @Override + public void handleIdleTime() + { + if (waitingEvents.size() > 0) { + processWaitEvents(); + } else { + /* nothing to do here, so sleep for a while to avoid busy loop */ + try { + Thread.sleep(sleepMillis); --- End diff -- Done > Development of Inner Join Operator using Spillable Datastructures > ----------------------------------------------------------------- > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Chaitanya > Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)