[jira] [Created] (APEXCORE-502) Unnecessary byte array copy in DefaultKryoStreamCodec.toByteArray
Vlad Rozov created APEXCORE-502: --- Summary: Unnecessary byte array copy in DefaultKryoStreamCodec.toByteArray Key: APEXCORE-502 URL: https://issues.apache.org/jira/browse/APEXCORE-502 Project: Apache Apex Core Issue Type: Improvement Reporter: Vlad Rozov {noformat} slice = new Slice(os.toByteArray(), 0, os.toByteArray().length); {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73969296 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +/** + * Implementations of this interface are used by Spillable datastructures to spill data to disk. + */ +public interface SpillableStateStore extends BucketedState, Component, +Operator.CheckpointNotificationListener, WindowListener +{ --- End diff -- Can we add ```hasBeenSetup()``` to this? I see in the tests that the ```setup()`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73966670 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java --- @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. + * @param The type of object stored in the {@link SpillableArrayListImpl}. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableArrayListImpl implements Spillable.SpillableArrayList, Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + + private int batchSize = DEFAULT_BATCH_SIZE; + private long bucketId; --- End diff -- It seems ```bucketId```, ```prefix``` are unused. Also please add getter/setter for batchsize. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: JDBC Poller implementation
+1 for common abstract class and specific implementations that are use case specific. Regards, Ashwin. On Mon, Aug 8, 2016 at 7:55 AM, Devendra Tagarewrote: > + 1 for a common Abstract Class. > > 0 - for adding ports. > > Use-case for CSV - data migration from columnar stores.Some users may want > to move TB's of data from a store like Greenplum and then do > transformations using the CSV parser that is already present. > > Thanks, > Dev > > On Mon, Aug 8, 2016 at 7:39 AM, Yogi Devendra > wrote: > > > +1 for Tushar's idea for common abstract class. > > > > Additionally, POJO based output should be considered for active > > development. CSV output can be deprecated. Since it can be achieved > easily > > using POJO + CSV formatter. > > > > Operators are meant to be lego blocks for reusable functionality to > achieve > > higher level functionality by clubbing multiple operators together. > > > > > > ~ Yogi > > > > On 8 August 2016 at 17:45, Tushar Gosavi wrote: > > > > > I would prefer a common abstract class having emitTuple method. And > > > two different implementation one for emitting comma separated values > > > and other emitting pojo. > > > > > > Regards, > > > -Tushar. > > > > > > > > > On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale > > > wrote: > > > > The concrete implementation is supposed to do mainly formatting of > > input > > > > data and emit in required form. Also it would be tricky to implement > > some > > > > abstract methods like "getTuple", you have to conditionally return > the > > > > values. > > > > > > > > -Priyanka > > > > > > > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda > > > wrote: > > > > > > > >> I am +1 for having two ports in the same concrete implementation. > > > >> One port for POJOs and second one for CSV strings (if this is > actually > > > >> needed) > > > >> > > > >> +0 on having multiple concrete operators. > > > >> I don't think it is necessary to create multiple concrete > > > implementations > > > >> just because we want the same data in different formats. Ports > should > > > serve > > > >> the purpose. > > > >> > > > >> ~ Bhupesh > > > >> > > > >> > > > >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale < > > > priya...@datatorrent.com> > > > >> wrote: > > > >> > > > >> > Hi, > > > >> > > > > >> > JDBCPollerInputOperator in malhar repository emits comma separated > > > list > > > >> of > > > >> > values as result of scan. As most of our input operators emit > POJOs > > I > > > am > > > >> > planning to add an implementation which emits pojo. > > > >> > > > > >> > I would like to discuss, if we should have two independent jdbc > poll > > > >> input > > > >> > operators, one emits csv and other which emits pojo or we should > > have > > > one > > > >> > operator having two ports? > > > >> > > > > >> > I prefer two operators to define clear intent of each operator, > but > > if > > > >> > anyone has different opinion please suggest. > > > >> > > > > >> > -Priyanka > > > >> > > > > >> > > > > > > -- Regards, Ashwin.
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73941677 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java --- @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; --- End diff -- I think this should be moved to a more generic package, possibly in Apex Core. But I think it's okay for now to put it there. Just annotate with InterfaceStability.Unstable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #362: JMS Input operator changes to support SQS and...
GitHub user sanjaypujare opened a pull request: https://github.com/apache/apex-malhar/pull/362 JMS Input operator changes to support SQS and ActiveMQ @PramodSSImmaneni these changes incorporate your comments and dont have extraneous changes. Pls review and merge... You can merge this pull request into a Git repository by running: $ git pull https://github.com/sanjaypujare/apex-malhar APEXMALHAR-2156.improvement2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #362 commit db32f40858ef37356b6fec836df605acdbcbcbfa Author: Sanjay PujareDate: 2016-08-08T19:40:05Z JMS Input operator changes to support SQS and ActiveMQ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-496) Provide Operator name to StatsListener
[ https://issues.apache.org/jira/browse/APEXCORE-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15412264#comment-15412264 ] ASF GitHub Bot commented on APEXCORE-496: - Github user tushargosavi commented on the issue: https://github.com/apache/apex-core/pull/361 @vrozov @sandeshh I have updated the pull request please review. > Provide Operator name to StatsListener > -- > > Key: APEXCORE-496 > URL: https://issues.apache.org/jira/browse/APEXCORE-496 > Project: Apache Apex Core > Issue Type: Task >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > > In case of shared statslistener it becomes difficult to identify for which > operator > s stats are being processed. we could provide operator name in > BatchedOperatorStats to easily identify the operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core issue #361: APEXCORE-496 make operator name available to StatsList...
Github user tushargosavi commented on the issue: https://github.com/apache/apex-core/pull/361 @vrozov @sandeshh I have updated the pull request please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [Proposal] Named Checkpoints
The idea here was to create, on demand, recovery/committed window. But there is always one(except before the first) recovery window for the DAG. Instead of using/modifying the Checkpoint tuple, I am planning to reuse the existing recovery window state, which simplifies the implementation. Proposed API: ApexCli> savepoint ApexCli> launch -savepoint first prototype: https://github.com/sandeshh/apex-core/commit/8ec7e837318c2b33289251cda78ece0024a3f895 Thanks On Thu, Aug 4, 2016 at 11:54 AM Amol Kekrewrote: > hmm! actually it may be a good debugging tool too. Keep the named > checkpoints around. The feature is to keep checkpoints around, which can be > done by giving a feature to not delete checkpoints, but then naming them > makes it more operational. Send a command from cli->get checkpoint -> know > it is the one you need as the file name has your string you send with the > command -> debug. This is different that querying a state as this gives > entire app checkpoint to debug with. > > Thks > Amol > > > On Thu, Aug 4, 2016 at 11:41 AM, Venkatesh Kottapalli < > venkat...@datatorrent.com> wrote: > > > + 1 for the idea. > > > > It might be helpful to developers as well when dealing with variety of > > data in large volumes if this can help them run from the checkpointed > state > > rather than rerunning the application altogether in case of issues. > > > > I have seen cases where the application runs for more than 10 hours and > > some partitions fail because of the variety of data that it is dealing > > with. In such cases, the application has to be restarted and it will be > > helpful to developers with a feature of this kind. > > > > The ease of enabling/disabling this feature to run the app will also be > > important. > > > > -Venkatesh. > > > > > > > On Aug 4, 2016, at 10:29 AM, Amol Kekre wrote: > > > > > > We had an user who wanted roll-back and restart from audit purposes. > That > > > time we did not have timed-window. Names checkpoint would have helped a > > > little bit.. > > > > > > Problem statement: Auditors ask for rerun of yesterday's computations > for > > > verification. Assume that these computations depend on previous state > > (i.e > > > data from day before yesterday). > > > > > > Solution > > > 1. Have named checkpoints at 12 in the night (an input adapter triggers > > it) > > > every day > > > 2. The app spools raw logs into hdfs along with window ids and event > > times > > > 3. The re-run is a separate app that starts off on a named checkpoint > (12 > > > night yesterday) > > > > > > Technically the solution will not as simple and "new audit app" will > > need a > > > lot of other checks (dedups, drop events not in yesterday's window, > wait > > > for late arrivals, ...), but names checkpoint helps. > > > > > > I do agree with Pramod's that replay within the same running app is not > > > viable within a data-in-motion architecture. But it helps somewhat in a > > new > > > audit app. Named checkpoints help data-in-motion architectures handle > > batch > > > apps better. In the above case #2 spooling done with event time > > stamp+state > > > suffices. The state part comes from names checkpoint. > > > > > > Thks, > > > Amol > > > > > > > > > > > > > > > On Thu, Aug 4, 2016 at 10:12 AM, Sanjay Pujare > > > > wrote: > > > > > >> I agree. A specific use-case will be useful to support this feature. > > Also > > >> the ability to replay from the named checkpoint will be limited > because > > of > > >> various factors, isn’t it? > > >> > > >> On 8/4/16, 9:00 AM, "Pramod Immaneni" wrote: > > >> > > >>There is a problem here, keeping old checkpoints and recovering > from > > >> them > > >>means preserving the old input data along with the state. This is > > more > > >> than > > >>the mechanism of actually creating named checkpoints, it means > having > > >> the > > >>ability for operators to move forward (a.k.a committed and dropping > > >>committed states and buffer data) while still having the ability to > > >> replay > > >>from that point from the input source and providing a way for > > >> operators (at > > >>first look input operators) to distinguish that. Why would someone > > need > > >>this with idempotent processing? Is there a specific use case you > are > > >>looking at? Suppose we go do this, for the mechanism, I would be in > > >> favor > > >>of reusing existing tuple. > > >> > > >>On Thu, Aug 4, 2016 at 8:44 AM, Vlad Rozov < > v.ro...@datatorrent.com> > > >> wrote: > > >> > > >>> +1 for the feature. At first look I am more in favor of reusing > > >> existing > > >>> control tuple. > > >>> > > >>> Thank you, > > >>> > > >>> Vlad > > >>> > > >>> > > >>> On 8/4/16 08:17, Sandesh Hegde wrote: > > >>> > > @Chinmay > > We can enhance the existing checkpoint tuple but that one is more > > frequently used than this feature,
Re: JDBC Poller implementation
+ 1 for a common Abstract Class. 0 - for adding ports. Use-case for CSV - data migration from columnar stores.Some users may want to move TB's of data from a store like Greenplum and then do transformations using the CSV parser that is already present. Thanks, Dev On Mon, Aug 8, 2016 at 7:39 AM, Yogi Devendrawrote: > +1 for Tushar's idea for common abstract class. > > Additionally, POJO based output should be considered for active > development. CSV output can be deprecated. Since it can be achieved easily > using POJO + CSV formatter. > > Operators are meant to be lego blocks for reusable functionality to achieve > higher level functionality by clubbing multiple operators together. > > > ~ Yogi > > On 8 August 2016 at 17:45, Tushar Gosavi wrote: > > > I would prefer a common abstract class having emitTuple method. And > > two different implementation one for emitting comma separated values > > and other emitting pojo. > > > > Regards, > > -Tushar. > > > > > > On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale > > wrote: > > > The concrete implementation is supposed to do mainly formatting of > input > > > data and emit in required form. Also it would be tricky to implement > some > > > abstract methods like "getTuple", you have to conditionally return the > > > values. > > > > > > -Priyanka > > > > > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda > > wrote: > > > > > >> I am +1 for having two ports in the same concrete implementation. > > >> One port for POJOs and second one for CSV strings (if this is actually > > >> needed) > > >> > > >> +0 on having multiple concrete operators. > > >> I don't think it is necessary to create multiple concrete > > implementations > > >> just because we want the same data in different formats. Ports should > > serve > > >> the purpose. > > >> > > >> ~ Bhupesh > > >> > > >> > > >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale < > > priya...@datatorrent.com> > > >> wrote: > > >> > > >> > Hi, > > >> > > > >> > JDBCPollerInputOperator in malhar repository emits comma separated > > list > > >> of > > >> > values as result of scan. As most of our input operators emit POJOs > I > > am > > >> > planning to add an implementation which emits pojo. > > >> > > > >> > I would like to discuss, if we should have two independent jdbc poll > > >> input > > >> > operators, one emits csv and other which emits pojo or we should > have > > one > > >> > operator having two ports? > > >> > > > >> > I prefer two operators to define clear intent of each operator, but > if > > >> > anyone has different opinion please suggest. > > >> > > > >> > -Priyanka > > >> > > > >> > > >
Re: JDBC Poller implementation
+1 for Tushar's idea for common abstract class. Additionally, POJO based output should be considered for active development. CSV output can be deprecated. Since it can be achieved easily using POJO + CSV formatter. Operators are meant to be lego blocks for reusable functionality to achieve higher level functionality by clubbing multiple operators together. ~ Yogi On 8 August 2016 at 17:45, Tushar Gosaviwrote: > I would prefer a common abstract class having emitTuple method. And > two different implementation one for emitting comma separated values > and other emitting pojo. > > Regards, > -Tushar. > > > On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale > wrote: > > The concrete implementation is supposed to do mainly formatting of input > > data and emit in required form. Also it would be tricky to implement some > > abstract methods like "getTuple", you have to conditionally return the > > values. > > > > -Priyanka > > > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda > wrote: > > > >> I am +1 for having two ports in the same concrete implementation. > >> One port for POJOs and second one for CSV strings (if this is actually > >> needed) > >> > >> +0 on having multiple concrete operators. > >> I don't think it is necessary to create multiple concrete > implementations > >> just because we want the same data in different formats. Ports should > serve > >> the purpose. > >> > >> ~ Bhupesh > >> > >> > >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale < > priya...@datatorrent.com> > >> wrote: > >> > >> > Hi, > >> > > >> > JDBCPollerInputOperator in malhar repository emits comma separated > list > >> of > >> > values as result of scan. As most of our input operators emit POJOs I > am > >> > planning to add an implementation which emits pojo. > >> > > >> > I would like to discuss, if we should have two independent jdbc poll > >> input > >> > operators, one emits csv and other which emits pojo or we should have > one > >> > operator having two ports? > >> > > >> > I prefer two operators to define clear intent of each operator, but if > >> > anyone has different opinion please suggest. > >> > > >> > -Priyanka > >> > > >> >
Re: JDBC Poller implementation
I would prefer a common abstract class having emitTuple method. And two different implementation one for emitting comma separated values and other emitting pojo. Regards, -Tushar. On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugalewrote: > The concrete implementation is supposed to do mainly formatting of input > data and emit in required form. Also it would be tricky to implement some > abstract methods like "getTuple", you have to conditionally return the > values. > > -Priyanka > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda wrote: > >> I am +1 for having two ports in the same concrete implementation. >> One port for POJOs and second one for CSV strings (if this is actually >> needed) >> >> +0 on having multiple concrete operators. >> I don't think it is necessary to create multiple concrete implementations >> just because we want the same data in different formats. Ports should serve >> the purpose. >> >> ~ Bhupesh >> >> >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale >> wrote: >> >> > Hi, >> > >> > JDBCPollerInputOperator in malhar repository emits comma separated list >> of >> > values as result of scan. As most of our input operators emit POJOs I am >> > planning to add an implementation which emits pojo. >> > >> > I would like to discuss, if we should have two independent jdbc poll >> input >> > operators, one emits csv and other which emits pojo or we should have one >> > operator having two ports? >> > >> > I prefer two operators to define clear intent of each operator, but if >> > anyone has different opinion please suggest. >> > >> > -Priyanka >> > >>
Re: JDBC Poller implementation
The concrete implementation is supposed to do mainly formatting of input data and emit in required form. Also it would be tricky to implement some abstract methods like "getTuple", you have to conditionally return the values. -Priyanka On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawdawrote: > I am +1 for having two ports in the same concrete implementation. > One port for POJOs and second one for CSV strings (if this is actually > needed) > > +0 on having multiple concrete operators. > I don't think it is necessary to create multiple concrete implementations > just because we want the same data in different formats. Ports should serve > the purpose. > > ~ Bhupesh > > > On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale > wrote: > > > Hi, > > > > JDBCPollerInputOperator in malhar repository emits comma separated list > of > > values as result of scan. As most of our input operators emit POJOs I am > > planning to add an implementation which emits pojo. > > > > I would like to discuss, if we should have two independent jdbc poll > input > > operators, one emits csv and other which emits pojo or we should have one > > operator having two ports? > > > > I prefer two operators to define clear intent of each operator, but if > > anyone has different opinion please suggest. > > > > -Priyanka > > >
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411685#comment-15411685 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73859472 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator-implements ActivationListener, IdleTimeHandler, Partitioner +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements --- End diff -- Can you mark this class with ```checkpointableWithinAppWindow = false```? Better to have it in the abstract class. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: JDBC Poller implementation
I am +1 for having two ports in the same concrete implementation. One port for POJOs and second one for CSV strings (if this is actually needed) +0 on having multiple concrete operators. I don't think it is necessary to create multiple concrete implementations just because we want the same data in different formats. Ports should serve the purpose. ~ Bhupesh On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugalewrote: > Hi, > > JDBCPollerInputOperator in malhar repository emits comma separated list of > values as result of scan. As most of our input operators emit POJOs I am > planning to add an implementation which emits pojo. > > I would like to discuss, if we should have two independent jdbc poll input > operators, one emits csv and other which emits pojo or we should have one > operator having two ports? > > I prefer two operators to define clear intent of each operator, but if > anyone has different opinion please suggest. > > -Priyanka >
[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...
Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/360 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...
GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/360 APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2174-S3-ReaderIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #360 commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96 Author: ChaitanyaDate: 2016-08-08T06:21:11Z APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2174) S3 File Reader reading more data than expected
[ https://issues.apache.org/jira/browse/APEXMALHAR-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411639#comment-15411639 ] ASF GitHub Bot commented on APEXMALHAR-2174: Github user chaithu14 closed the pull request at: https://github.com/apache/apex-malhar/pull/360 > S3 File Reader reading more data than expected > -- > > Key: APEXMALHAR-2174 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2174 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > This is observed through the AWS billing. > Issue might be the S3InputStream.read() which is used in readEntity(). > Reading the block can be achieved through the AmazonS3 api's. So, I am > proposing the following solution: > ``` > GetObjectRequest rangeObjectRequest = new GetObjectRequest( > bucketName, key); > rangeObjectRequest.setRange(startByte, noOfBytes); > S3Object objectPortion = s3Client.getObject(rangeObjectRequest); > S3ObjectInputStream wrappedStream = objectPortion.getObjectContent(); > byte[] record = ByteStreams.toByteArray(wrappedStream); > Advantages of this solution: Parallel read will work for all types of s3 file > systems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2174) S3 File Reader reading more data than expected
[ https://issues.apache.org/jira/browse/APEXMALHAR-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411640#comment-15411640 ] ASF GitHub Bot commented on APEXMALHAR-2174: GitHub user chaithu14 reopened a pull request: https://github.com/apache/apex-malhar/pull/360 APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2174-S3-ReaderIssue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #360 commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96 Author: ChaitanyaDate: 2016-08-08T06:21:11Z APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue > S3 File Reader reading more data than expected > -- > > Key: APEXMALHAR-2174 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2174 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > This is observed through the AWS billing. > Issue might be the S3InputStream.read() which is used in readEntity(). > Reading the block can be achieved through the AmazonS3 api's. So, I am > proposing the following solution: > ``` > GetObjectRequest rangeObjectRequest = new GetObjectRequest( > bucketName, key); > rangeObjectRequest.setRange(startByte, noOfBytes); > S3Object objectPortion = s3Client.getObject(rangeObjectRequest); > S3ObjectInputStream wrappedStream = objectPortion.getObjectContent(); > byte[] record = ByteStreams.toByteArray(wrappedStream); > Advantages of this solution: Parallel read will work for all types of s3 file > systems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #359: APEXMALHAR-2161: Add tests for AbstractThroug...
Github user yogidevendra commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/359#discussion_r73849743 --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperatorTest.java --- @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener.BatchedOperatorStats; +import com.datatorrent.api.StatsListener.Response; +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileCounters; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class AbstractThroughputFileInputOperatorTest +{ + private AbstractThroughputFileInputOperator underTest; + @Mock + private PartitionmockPartition; + @Mock + private BatchedOperatorStats mockBatchStats; + @Mock + private OperatorStats mockOperatorStats; + @Mock + private BasicCounters fileCountersMock; + @Mock + private MutableLong fileCounterMock; + + @Before + public void setup() + { +underTest = new ThroughputFileInputOperator(); +MockitoAnnotations.initMocks(this); + +when(mockPartition.getPartitionedInstance()).thenReturn(underTest); + } + + @Test + public void testInitialPartitioning() + { +underTest.setPartitionCount(4); +underTest.setPreferredMaxPendingFilesPerOperator(6); + +for (int i = 0; i < 74; i++) { + underTest.pendingFiles.add("file-" + i); +} + +int partitioncount = underTest.getNewPartitionCount(Collections.singleton(mockPartition), null); +Assert.assertEquals(4, partitioncount); + } + + @Test + public void testProcessStats() throws Exception + { +underTest.setPartitionCount(4); +underTest.setPreferredMaxPendingFilesPerOperator(10); + +for (int i = 0; i < 21; i++) { + underTest.pendingFiles.add("file-" + i); +} + +mockOperatorStats.counters = fileCountersMock; +when(mockPartition.getStats()).thenReturn(mockBatchStats); + when(mockBatchStats.getLastWindowedStats()).thenReturn(Collections.singletonList(mockOperatorStats)); + when(fileCountersMock.getCounter(any(FileCounters.class))).thenReturn(fileCounterMock); +when(fileCounterMock.getValue()).thenReturn(20L); + +Response response = underTest.processStats(mockBatchStats); + +Assert.assertTrue(response.repartitionRequired); + } + + @Test + public void testRepartitioning() + { +underTest.setPartitionCount(4); --- End diff -- Please add more scenarios for repartitioning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2161) Add tests for AbstractThroughputFileInputOperator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411604#comment-15411604 ] ASF GitHub Bot commented on APEXMALHAR-2161: Github user yogidevendra commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/359#discussion_r73849553 --- Diff: library/src/test/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperatorTest.java --- @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener.BatchedOperatorStats; +import com.datatorrent.api.StatsListener.Response; +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileCounters; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class AbstractThroughputFileInputOperatorTest +{ + private AbstractThroughputFileInputOperator underTest; + @Mock + private PartitionmockPartition; + @Mock + private BatchedOperatorStats mockBatchStats; + @Mock + private OperatorStats mockOperatorStats; + @Mock + private BasicCounters fileCountersMock; + @Mock + private MutableLong fileCounterMock; + + @Before + public void setup() + { +underTest = new ThroughputFileInputOperator(); +MockitoAnnotations.initMocks(this); + +when(mockPartition.getPartitionedInstance()).thenReturn(underTest); + } + + @Test + public void testInitialPartitioning() + { +underTest.setPartitionCount(4); +underTest.setPreferredMaxPendingFilesPerOperator(6); + +for (int i = 0; i < 74; i++) { + underTest.pendingFiles.add("file-" + i); +} + +int partitioncount = underTest.getNewPartitionCount(Collections.singleton(mockPartition), null); +Assert.assertEquals(4, partitioncount); + } + + @Test + public void testProcessStats() throws Exception + { +underTest.setPartitionCount(4); +underTest.setPreferredMaxPendingFilesPerOperator(10); + +for (int i = 0; i < 21; i++) { + underTest.pendingFiles.add("file-" + i); +} + +mockOperatorStats.counters = fileCountersMock; +when(mockPartition.getStats()).thenReturn(mockBatchStats); + when(mockBatchStats.getLastWindowedStats()).thenReturn(Collections.singletonList(mockOperatorStats)); + when(fileCountersMock.getCounter(any(FileCounters.class))).thenReturn(fileCounterMock); +when(fileCounterMock.getValue()).thenReturn(20L); + +Response response = underTest.processStats(mockBatchStats); + +Assert.assertTrue(response.repartitionRequired); --- End diff -- Could you please add test scenario for repartitionRequired=false. > Add tests for AbstractThroughputFileInputOperator > - > > Key: APEXMALHAR-2161 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2161 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale >Priority: Minor > Original Estimate: 24h > Time Spent: 8h > Remaining Estimate: 16h > > Add unit tests for AbstractThroughputFileInputOperator -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411600#comment-15411600 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73848852 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); --- End diff -- removed this and couple of other lines :) > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411598#comment-15411598 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73848661 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); --- End diff -- calling disconnect for non-poller threads as they are not going to connect to db again, also udpated exception handling. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73848661 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); --- End diff -- calling disconnect for non-poller threads as they are not going to connect to db again, also udpated exception handling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411578#comment-15411578 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73845347 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; --- End diff -- The thread execution is not bound to window. And even if the pollInterval is short and there are multiple calls to poll record, as it's a single thread, the subsequent calls will block till this one is done. And the isPolled will be "true" for consequent calls of non-poller threads. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411558#comment-15411558 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844259 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient Map , Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411565#comment-15411565 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844320 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411556#comment-15411556 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844231 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); --- End diff -- Ok > Development of Inner Join Operator using Spillable Datastructures > - > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chaitanya >Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411569#comment-15411569 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844368 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ManagedTimeStateMultiValueimplements Spillable.SpillableByteArrayListMultimap +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; +} +if (isKeyContainsMultiValue) { + return (List)streamCodec.fromByteArray(valueSlice); +} +value = new ArrayList<>(); +value.add((V)streamCodec.fromByteArray(valueSlice)); +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411560#comment-15411560 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844285 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411557#comment-15411557 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844252 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient Map , Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411570#comment-15411570 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844379 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ManagedTimeStateMultiValueimplements Spillable.SpillableByteArrayListMultimap +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; +} +if (isKeyContainsMultiValue) { + return (List)streamCodec.fromByteArray(valueSlice); +} +value = new ArrayList<>(); +value.add((V)streamCodec.fromByteArray(valueSlice)); +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411567#comment-15411567 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844336 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411563#comment-15411563 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844300 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844379 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ManagedTimeStateMultiValueimplements Spillable.SpillableByteArrayListMultimap +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; +} +if (isKeyContainsMultiValue) { + return (List)streamCodec.fromByteArray(valueSlice); +} +value = new ArrayList<>(); +value.add((V)streamCodec.fromByteArray(valueSlice)); +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { +throw new
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844212 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperatorextends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap stream1Data; + protected Spillable.SpillableByteArrayListMultimap stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +if (!store.put(key,
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844311 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844328 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844275 --- Diff: library/src/main/java/com/datatorrent/lib/join/InnerJoinStreamCodec.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Stream codec based on keyExpression for POJO Inner Join Operator. + * + * @Since 3.5.0 --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844228 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844300 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844285 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844252 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient Map , Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); +} + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + +component = new ManagedSpillableComplexComponent(); +stream1Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store, !isStream1KeyPrimary()); +stream2Data = ((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store, !isStream2KeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key in asynchronous if found it in opposite store + * 4) If the future is done then Merge the given tuple and values found
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411555#comment-15411555 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844228 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler --- End diff -- Done > Development of Inner Join Operator using Spillable Datastructures > - > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chaitanya >Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411548#comment-15411548 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73843385 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator-implements ActivationListener, IdleTimeHandler, Partitioner +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements +ActivationListener, Partitioner { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 1; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; --- End diff -- changing size to 4k > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73843385 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator-implements ActivationListener, IdleTimeHandler, Partitioner +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements +ActivationListener, Partitioner { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 1; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; --- End diff -- changing size to 4k --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73843244 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator() public void setup(OperatorContext context) { super.setup(context); -spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); +intializeDSLContext(); +if (scanService == null) { + scanService = Executors.newScheduledThreadPool(1); +} execute = true; -cause = new AtomicReference(); -emitQueue = new ArrayBlockingQueue(queueCapacity); -this.context = context; +emitQueue = new LinkedBlockingDeque<>(queueCapacity); operatorId = context.getId(); +windowManager.setup(context); + } -try { + private void intializeDSLContext() + { +create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); + } - //If its a range query pass upper and lower bounds - //If its a polling query pass only the lower bound - if (getRangeQueryPair().getValue() != null) { -ps = store.getConnection() -.prepareStatement( -JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), -rangeQueryPair.getValue()), -java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + @Override + public void activate(OperatorContext context) + { +initializePreparedStatement(); +long largestRecoveryWindow = windowManager.getLargestRecoveryWindow(); +if (largestRecoveryWindow == Stateless.WINDOW_ID +|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { + scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); +} + } + + protected void initializePreparedStatement() + { +try { + // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound + if (isPollerPartition) { +ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } else { ps = store.getConnection().prepareStatement( -JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()), -java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); -isPollable = true; +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } - } catch (SQLException e) { LOG.error("Exception in initializing the range query for a given partition", e); throw new RuntimeException(e); } -windowManager.setup(context); -LOG.debug("super setup done..."); } @Override public void beginWindow(long windowId) { currentWindowId = windowId; - -isReplayed = false; - if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { try { replay(currentWindowId); +return; } catch (SQLException e) { LOG.error("Exception in replayed windows", e); throw new RuntimeException(e); } } - -if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) { - try { -if (!isPollable && rangeQueryPair.getValue() != null) { - - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound, - rangeQueryPair.getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); -} else { - String bound = null; - if (previousUpperBound == null) { -bound = getRangeQueryPair().getKey(); - } else { -bound = previousUpperBound; - } - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - isPollable = true; -} -isReplayed = false; -LOG.debug("Prepared statement after re-initialization - {} ", ps.toString()); - } catch (SQLException e) { -
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411544#comment-15411544 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73842885 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); --- End diff -- Use ```offer``` instead, else we might get an ```IllegalStateException``` > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73842885 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); --- End diff -- Use ```offer``` instead, else we might get an ```IllegalStateException``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411531#comment-15411531 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841819 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); --- End diff -- can be removed? > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841819 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); --- End diff -- can be removed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841763 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); } - } - @Override - public void handleIdleTime() - { -if (execute) { - try { -Thread.sleep(spinMillis); - } catch (InterruptedException ie) { -throw new RuntimeException(ie); - } -} else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); -} } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { -isReplayed = true; -MutablePairrecoveredData = new MutablePair (); try { - recoveredData = (MutablePair )windowManager.load(operatorId, windowId); + MutablePair recoveredData = (MutablePair )windowManager.load(operatorId, + windowId); - if (recoveredData != null) { -//skip the window and return if there was no incoming data in the window -if (recoveredData.left == null || recoveredData.right == null) { - return; -} - -if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; -} + if (recoveredData != null && shouldReplayWindow(recoveredData)) { +LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, +recoveredData.right); -JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); -jdbcPoller.setStore(store); -jdbcPoller.setKey(getKey()); -jdbcPoller.setPartitionCount(getPartitionCount()); -jdbcPoller.setPollInterval(getPollInterval()); -jdbcPoller.setTableName(getTableName()); -jdbcPoller.setBatchSize(getBatchSize()); -isPollable = false; - -LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); +ps = store.getConnection().prepareStatement( +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); +LOG.info("Query formed to recover data - {}", ps.toString()); -jdbcPoller.setRangeQueryPair(new KeyValPair (recoveredData.left, recoveredData.right)); +emitReplayedTuples(ps); -jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement( -JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(), -jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()), -java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); -
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411521#comment-15411521 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841414 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); } - } - @Override - public void handleIdleTime() - { -if (execute) { - try { -Thread.sleep(spinMillis); - } catch (InterruptedException ie) { -throw new RuntimeException(ie); - } -} else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); -} } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { -isReplayed = true; -MutablePairrecoveredData = new MutablePair (); try { - recoveredData = (MutablePair )windowManager.load(operatorId, windowId); + MutablePair recoveredData = (MutablePair )windowManager.load(operatorId, + windowId); - if (recoveredData != null) { -//skip the window and return if there was no incoming data in the window -if (recoveredData.left == null || recoveredData.right == null) { - return; -} - -if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; -} + if (recoveredData != null && shouldReplayWindow(recoveredData)) { +LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, +recoveredData.right); -JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); -jdbcPoller.setStore(store); -jdbcPoller.setKey(getKey()); -jdbcPoller.setPartitionCount(getPartitionCount()); -jdbcPoller.setPollInterval(getPollInterval()); -jdbcPoller.setTableName(getTableName()); -jdbcPoller.setBatchSize(getBatchSize()); -isPollable = false; - -LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); +ps = store.getConnection().prepareStatement( +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), --- End diff -- right, would update. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841414 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); } - } - @Override - public void handleIdleTime() - { -if (execute) { - try { -Thread.sleep(spinMillis); - } catch (InterruptedException ie) { -throw new RuntimeException(ie); - } -} else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); -} } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { -isReplayed = true; -MutablePairrecoveredData = new MutablePair (); try { - recoveredData = (MutablePair )windowManager.load(operatorId, windowId); + MutablePair recoveredData = (MutablePair )windowManager.load(operatorId, + windowId); - if (recoveredData != null) { -//skip the window and return if there was no incoming data in the window -if (recoveredData.left == null || recoveredData.right == null) { - return; -} - -if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; -} + if (recoveredData != null && shouldReplayWindow(recoveredData)) { +LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, +recoveredData.right); -JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); -jdbcPoller.setStore(store); -jdbcPoller.setKey(getKey()); -jdbcPoller.setPartitionCount(getPartitionCount()); -jdbcPoller.setPollInterval(getPollInterval()); -jdbcPoller.setTableName(getTableName()); -jdbcPoller.setBatchSize(getBatchSize()); -isPollable = false; - -LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); +ps = store.getConnection().prepareStatement( +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), --- End diff -- right, would update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411515#comment-15411515 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73841049 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator() public void setup(OperatorContext context) { super.setup(context); -spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); +intializeDSLContext(); +if (scanService == null) { + scanService = Executors.newScheduledThreadPool(1); +} execute = true; -cause = new AtomicReference(); -emitQueue = new ArrayBlockingQueue(queueCapacity); -this.context = context; +emitQueue = new LinkedBlockingDeque<>(queueCapacity); operatorId = context.getId(); +windowManager.setup(context); + } -try { + private void intializeDSLContext() + { +create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); + } - //If its a range query pass upper and lower bounds - //If its a polling query pass only the lower bound - if (getRangeQueryPair().getValue() != null) { -ps = store.getConnection() -.prepareStatement( -JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), -rangeQueryPair.getValue()), -java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + @Override + public void activate(OperatorContext context) + { +initializePreparedStatement(); +long largestRecoveryWindow = windowManager.getLargestRecoveryWindow(); +if (largestRecoveryWindow == Stateless.WINDOW_ID +|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { + scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); +} + } + + protected void initializePreparedStatement() + { +try { + // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound + if (isPollerPartition) { +ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } else { ps = store.getConnection().prepareStatement( -JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()), -java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); -isPollable = true; +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } - } catch (SQLException e) { LOG.error("Exception in initializing the range query for a given partition", e); throw new RuntimeException(e); } -windowManager.setup(context); -LOG.debug("super setup done..."); } @Override public void beginWindow(long windowId) { currentWindowId = windowId; - -isReplayed = false; - if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { try { replay(currentWindowId); +return; } catch (SQLException e) { LOG.error("Exception in replayed windows", e); throw new RuntimeException(e); } } - -if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) { - try { -if (!isPollable && rangeQueryPair.getValue() != null) { - - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound, - rangeQueryPair.getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); -} else { - String bound = null; - if (previousUpperBound == null) { -bound = getRangeQueryPair().getKey(); - } else { -bound = previousUpperBound; - } - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), - java.sql.ResultSet.TYPE_FORWARD_ONLY,
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73840984 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); --- End diff -- This will be called even in case of no exceptions and create problems. We should let the exception reach the operator thread. Otherwise this might never get lost in the thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411512#comment-15411512 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73840984 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); --- End diff -- This will be called even in case of no exceptions and create problems. We should let the exception reach the operator thread. Otherwise this might never get lost in the thread. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73840151 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; --- End diff -- Does this mean that we want to poll records just once in a window? Is this because the poll interval is sufficiently long so as to prevent multiple calls to ```pollRecords()``` within a single window? Even in that case, if the poll interval is reduced, there might be multiple calls where only one call will happen in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411480#comment-15411480 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73838593 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); } - } - @Override - public void handleIdleTime() - { -if (execute) { - try { -Thread.sleep(spinMillis); - } catch (InterruptedException ie) { -throw new RuntimeException(ie); - } -} else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); -} } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { -isReplayed = true; -MutablePairrecoveredData = new MutablePair (); try { - recoveredData = (MutablePair )windowManager.load(operatorId, windowId); + MutablePair recoveredData = (MutablePair )windowManager.load(operatorId, + windowId); - if (recoveredData != null) { -//skip the window and return if there was no incoming data in the window -if (recoveredData.left == null || recoveredData.right == null) { - return; -} - -if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; -} + if (recoveredData != null && shouldReplayWindow(recoveredData)) { +LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, +recoveredData.right); -JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); -jdbcPoller.setStore(store); -jdbcPoller.setKey(getKey()); -jdbcPoller.setPartitionCount(getPartitionCount()); -jdbcPoller.setPollInterval(getPollInterval()); -jdbcPoller.setTableName(getTableName()); -jdbcPoller.setBatchSize(getBatchSize()); -isPollable = false; - -LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); +ps = store.getConnection().prepareStatement( +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), +TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); +LOG.info("Query formed to recover data - {}", ps.toString()); -jdbcPoller.setRangeQueryPair(new KeyValPair (recoveredData.left, recoveredData.right)); +emitReplayedTuples(ps); -jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement( -
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73838062 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce --- End diff -- exactlyonce => idempotent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411473#comment-15411473 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73838062 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce --- End diff -- exactlyonce => idempotent > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73830019 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -438,119 +261,110 @@ public void endWindow() currentWindowRecoveryState = new MutablePair<>(); } - public int getPartitionCount() - { -return partitionCount; - } - - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { -this.partitionCount = partitionCount; +scanService.shutdownNow(); +store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { -if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID -&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here +if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { -dbPoller.interrupt(); -dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { +do { + emitQueue.add(getTuple(result)); +} while (result.next()); } -} catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); + isPolled = true; +} catch (SQLException ex) { + throw new RuntimeException(String.format("Error while running query"), ex); +} finally { + store.disconnect(); } - } - @Override - public void handleIdleTime() - { -if (execute) { - try { -Thread.sleep(spinMillis); - } catch (InterruptedException ie) { -throw new RuntimeException(ie); - } -} else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); -} } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { -isReplayed = true; -MutablePairrecoveredData = new MutablePair (); try { - recoveredData = (MutablePair )windowManager.load(operatorId, windowId); + MutablePair recoveredData = (MutablePair )windowManager.load(operatorId, + windowId); - if (recoveredData != null) { -//skip the window and return if there was no incoming data in the window -if (recoveredData.left == null || recoveredData.right == null) { - return; -} - -if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; -} + if (recoveredData != null && shouldReplayWindow(recoveredData)) { +LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, +recoveredData.right); -JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); -jdbcPoller.setStore(store); -jdbcPoller.setKey(getKey()); -jdbcPoller.setPartitionCount(getPartitionCount()); -jdbcPoller.setPollInterval(getPollInterval()); -jdbcPoller.setTableName(getTableName()); -jdbcPoller.setBatchSize(getBatchSize()); -isPollable = false; - -LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); +ps = store.getConnection().prepareStatement( +buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), --- End diff -- Should this fetch data from ```recoveredData.left``` to ```recoveredData.right```? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chinmay Kolhatkar resolved APEXMALHAR-1701. --- Resolution: Done Fix Version/s: 3.5.0 PR for deduper with managed state merged. > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Bhupesh Chawda > Fix For: 3.5.0 > > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411406#comment-15411406 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73828487 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator-implements ActivationListener, IdleTimeHandler, Partitioner +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements +ActivationListener, Partitioner { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 1; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; + private static int DEFAULT_POLL_INTERVAL = 10 * 1000; + private static int DEFAULT_FETCH_SIZE = 2; + private static int DEFAULT_BATCH_SIZE = 2000; + private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int fetchSize = DEFAULT_FETCH_SIZE; @Min(1) private int partitionCount = 1; - protected transient int operatorId; - protected transient boolean isReplayed; - protected transient boolean isPollable; - protected int batchSize; - protected static int fetchSize = 2; - /** - * Map of windowId to of the range key - */ - protected transient MutablePair currentWindowRecoveryState; - - /** - * size of the emit queue used to hold polled records before emit - */ - private static int queueCapacity = 4 * 1024 * 1024; + private int batchSize = DEFAULT_BATCH_SIZE; + + @NotNull + private String tableName; + @NotNull + private String columnsExpression; + @NotNull + private String key; + private String whereCondition = null; + private long currentWindowId; + private WindowDataManager windowManager; + + protected KeyValPair rangeQueryPair; + protected Integer lowerBound; + private transient int operatorId; + private transient DSLContext create; private transient volatile boolean execute; - private transient AtomicReference cause; - protected transient int spinMillis; - private transient OperatorContext context; - protected String tableName; - protected String key; - protected long currentWindowId; - protected KeyValPair rangeQueryPair; - protected String lower; - protected String upper; - protected boolean recovered; - protected boolean isPolled; - protected String whereCondition = null; - protected String previousUpperBound; - protected String highestPolled; - private static final String user = ""; - private static final String password = ""; - /** - * thread to poll database - */ - private transient Thread dbPoller; - protected transient ArrayBlockingQueue emitQueue; + private transient ScheduledExecutorService scanService; + protected transient boolean isPolled; + protected transient Integer lastPolledBound; --- End diff -- ```upperBound```? Seems to be more intuitive when looking at the rest of the code. > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73828487 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator-implements ActivationListener, IdleTimeHandler, Partitioner +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements +ActivationListener, Partitioner { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 1; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; + private static int DEFAULT_POLL_INTERVAL = 10 * 1000; + private static int DEFAULT_FETCH_SIZE = 2; + private static int DEFAULT_BATCH_SIZE = 2000; + private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int fetchSize = DEFAULT_FETCH_SIZE; @Min(1) private int partitionCount = 1; - protected transient int operatorId; - protected transient boolean isReplayed; - protected transient boolean isPollable; - protected int batchSize; - protected static int fetchSize = 2; - /** - * Map of windowId to of the range key - */ - protected transient MutablePair currentWindowRecoveryState; - - /** - * size of the emit queue used to hold polled records before emit - */ - private static int queueCapacity = 4 * 1024 * 1024; + private int batchSize = DEFAULT_BATCH_SIZE; + + @NotNull + private String tableName; + @NotNull + private String columnsExpression; + @NotNull + private String key; + private String whereCondition = null; + private long currentWindowId; + private WindowDataManager windowManager; + + protected KeyValPair rangeQueryPair; + protected Integer lowerBound; + private transient int operatorId; + private transient DSLContext create; private transient volatile boolean execute; - private transient AtomicReference cause; - protected transient int spinMillis; - private transient OperatorContext context; - protected String tableName; - protected String key; - protected long currentWindowId; - protected KeyValPair rangeQueryPair; - protected String lower; - protected String upper; - protected boolean recovered; - protected boolean isPolled; - protected String whereCondition = null; - protected String previousUpperBound; - protected String highestPolled; - private static final String user = ""; - private static final String password = ""; - /** - * thread to poll database - */ - private transient Thread dbPoller; - protected transient ArrayBlockingQueue emitQueue; + private transient ScheduledExecutorService scanService; + protected transient boolean isPolled; + protected transient Integer lastPolledBound; --- End diff -- ```upperBound```? Seems to be more intuitive when looking at the rest of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73828318 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411403#comment-15411403 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73828318 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411388#comment-15411388 ] Siyuan Hua commented on APEXMALHAR-2169: [~chaithu] Then I think the problem is in softConstraint and hardConstraint code, it should never return true because default limit is Long.MAX_VALUE. There is something in backlog that I didn't track in Jira(my bad). But since you have issue here, can you please do some refactor here. We want to actually simplify the operator code instead of making it more and more complicate. And kafka input operator is there for awhile and I don't see any requirement/asking for dynamic partition based on throughput. Can we take away the hardConstraint and softConstraint condition check and make the 2 upperbound property deprecated. So dynamic partition by default should only happen when kafka partition changes. And for ONE_TO_MANY partition strategy, the number of operator partitions should stay unchanged for the whole application with the specified initialPartitionCount. I think there is still bug there that if new kafka partition is added, we always start a new partition. That is not correct. And you can create another ticket to move all repartition based on throughput to a separate Partitioner so the operator code would be simple and easy to understand/debug > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411374#comment-15411374 ] ASF GitHub Bot commented on APEXMALHAR-2172: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73826686 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps) */ @Override public Collection> definePartitions( - Collection > partitions, - com.datatorrent.api.Partitioner.PartitioningContext context) + Collection > partitions, PartitioningContext context) { List > newPartitions = new ArrayList >( getPartitionCount()); -JdbcStore jdbcStore = new JdbcStore(); -jdbcStore.setDatabaseDriver(store.getDatabaseDriver()); -jdbcStore.setDatabaseUrl(store.getDatabaseUrl()); -jdbcStore.setConnectionProperties(store.getConnectionProperties()); -jdbcStore.connect(); - -HashMap > partitionToRangeMap = null; +HashMap > partitionToRangeMap = null; try { - partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(), - jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(), - store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password), - whereCondition, emitColumnList); + store.connect(); + intializeDSLContext(); + partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount()); } catch (SQLException e) { LOG.error("Exception in initializing the partition range", e); + throw new RuntimeException(e); +} finally { + store.disconnect(); } KryoCloneUtils cloneUtils = KryoCloneUtils.createCloneUtils(this); +// The n given partitions are for range queries and n + 1 partition is for polling query for (int i = 0; i <= getPartitionCount(); i++) { - AbstractJdbcPollInputOperator jdbcPoller = null; - - jdbcPoller = cloneUtils.getClone(); - - jdbcPoller.setStore(store); - jdbcPoller.setKey(getKey()); - jdbcPoller.setPartitionCount(getPartitionCount()); - jdbcPoller.setPollInterval(getPollInterval()); - jdbcPoller.setTableName(getTableName()); - jdbcPoller.setBatchSize(getBatchSize()); - jdbcPoller.setEmitColumnList(getEmitColumnList()); - - store.connect(); - //The n given partitions are for range queries and n + 1 partition is for polling query - //The upper bound for the n+1 partition is set to null since its a pollable partition + AbstractJdbcPollInputOperator jdbcPoller = cloneUtils.getClone(); if (i < getPartitionCount()) { -jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i)); -isPollable = false; +jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i); +jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey(); --- End diff -- Set ```jdbcPoller.isPollerPartition = false``` here? > Update JDBC poll input operator to fix issues > - > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)