[jira] [Created] (APEXCORE-502) Unnecessary byte array copy in DefaultKryoStreamCodec.toByteArray

2016-08-08 Thread Vlad Rozov (JIRA)
Vlad Rozov created APEXCORE-502:
---

 Summary: Unnecessary byte array copy in 
DefaultKryoStreamCodec.toByteArray
 Key: APEXCORE-502
 URL: https://issues.apache.org/jira/browse/APEXCORE-502
 Project: Apache Apex Core
  Issue Type: Improvement
Reporter: Vlad Rozov


{noformat}
  slice = new Slice(os.toByteArray(), 0, os.toByteArray().length);
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73969296
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * Implementations of this interface are used by Spillable datastructures 
to spill data to disk.
+ */
+public interface SpillableStateStore extends BucketedState, 
Component,
+Operator.CheckpointNotificationListener, WindowListener
+{
--- End diff --

Can we add ```hasBeenSetup()``` to this?
I see in the tests that the ```setup()``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73966670
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
 ---
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link 
SpillableStateStore}.
+ * @param  The type of object stored in the {@link 
SpillableArrayListImpl}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableArrayListImpl implements 
Spillable.SpillableArrayList, Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  private long bucketId;
--- End diff --

It seems ```bucketId```, ```prefix``` are unused. Also please add 
getter/setter for batchsize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: JDBC Poller implementation

2016-08-08 Thread Ashwin Chandra Putta
+1 for common abstract class and specific implementations that are use case
specific.

Regards,
Ashwin.

On Mon, Aug 8, 2016 at 7:55 AM, Devendra Tagare 
wrote:

> + 1 for a common Abstract Class.
>
> 0 - for adding ports.
>
> Use-case for CSV - data migration from columnar stores.Some users may want
> to move TB's of data from a store like Greenplum and then do
> transformations using the CSV parser that is already present.
>
> Thanks,
> Dev
>
> On Mon, Aug 8, 2016 at 7:39 AM, Yogi Devendra 
> wrote:
>
> > +1 for Tushar's idea for common abstract class.
> >
> > Additionally, POJO based output should be considered for active
> > development. CSV output can be deprecated. Since it can be achieved
> easily
> > using POJO + CSV formatter.
> >
> > Operators are meant to be lego blocks for reusable functionality to
> achieve
> > higher level functionality by clubbing multiple operators together.
> >
> >
> > ~ Yogi
> >
> > On 8 August 2016 at 17:45, Tushar Gosavi  wrote:
> >
> > > I would prefer a common abstract class having emitTuple method. And
> > > two different implementation one for emitting comma separated values
> > > and other emitting pojo.
> > >
> > > Regards,
> > > -Tushar.
> > >
> > >
> > > On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale
> > >  wrote:
> > > > The concrete implementation is supposed to do mainly formatting of
> > input
> > > > data and emit in required form. Also it would be tricky to implement
> > some
> > > > abstract methods like "getTuple", you have to conditionally return
> the
> > > > values.
> > > >
> > > > -Priyanka
> > > >
> > > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda 
> > > wrote:
> > > >
> > > >> I am +1 for having two ports in the same concrete implementation.
> > > >> One port for POJOs and second one for CSV strings (if this is
> actually
> > > >> needed)
> > > >>
> > > >> +0 on having multiple concrete operators.
> > > >> I don't think it is necessary to create multiple concrete
> > > implementations
> > > >> just because we want the same data in different formats. Ports
> should
> > > serve
> > > >> the purpose.
> > > >>
> > > >> ~ Bhupesh
> > > >>
> > > >>
> > > >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale <
> > > priya...@datatorrent.com>
> > > >> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > JDBCPollerInputOperator in malhar repository emits comma separated
> > > list
> > > >> of
> > > >> > values as result of scan. As most of our input operators emit
> POJOs
> > I
> > > am
> > > >> > planning to add an implementation which emits pojo.
> > > >> >
> > > >> > I would like to discuss, if we should have two independent jdbc
> poll
> > > >> input
> > > >> > operators, one emits csv and other which emits pojo or we should
> > have
> > > one
> > > >> > operator having two ports?
> > > >> >
> > > >> > I prefer two operators to define clear intent of each operator,
> but
> > if
> > > >> > anyone has different opinion please suggest.
> > > >> >
> > > >> > -Priyanka
> > > >> >
> > > >>
> > >
> >
>



-- 

Regards,
Ashwin.


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73941677
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
 ---
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
--- End diff --

I think this should be moved to a more generic package, possibly in Apex 
Core. But I think it's okay for now to put it there. Just annotate with 
InterfaceStability.Unstable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #362: JMS Input operator changes to support SQS and...

2016-08-08 Thread sanjaypujare
GitHub user sanjaypujare opened a pull request:

https://github.com/apache/apex-malhar/pull/362

JMS Input operator changes to support SQS and ActiveMQ

@PramodSSImmaneni these changes incorporate your comments and dont have 
extraneous changes. Pls review and merge...

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sanjaypujare/apex-malhar 
APEXMALHAR-2156.improvement2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #362


commit db32f40858ef37356b6fec836df605acdbcbcbfa
Author: Sanjay Pujare 
Date:   2016-08-08T19:40:05Z

JMS Input operator changes to support SQS and ActiveMQ




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-496) Provide Operator name to StatsListener

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15412264#comment-15412264
 ] 

ASF GitHub Bot commented on APEXCORE-496:
-

Github user tushargosavi commented on the issue:

https://github.com/apache/apex-core/pull/361
  
@vrozov  @sandeshh I have updated the pull request  please review.


> Provide Operator name to StatsListener
> --
>
> Key: APEXCORE-496
> URL: https://issues.apache.org/jira/browse/APEXCORE-496
> Project: Apache Apex Core
>  Issue Type: Task
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
>
> In case of shared statslistener it becomes difficult to identify for which 
> operator
> s stats are being processed. we could provide operator name in 
> BatchedOperatorStats to easily identify the operator. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core issue #361: APEXCORE-496 make operator name available to StatsList...

2016-08-08 Thread tushargosavi
Github user tushargosavi commented on the issue:

https://github.com/apache/apex-core/pull/361
  
@vrozov  @sandeshh I have updated the pull request  please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [Proposal] Named Checkpoints

2016-08-08 Thread Sandesh Hegde
The idea here was to create, on demand, recovery/committed window. But
there is always one(except before the first) recovery window for the DAG.
Instead of using/modifying the Checkpoint tuple, I am planning to reuse
the existing recovery window state, which simplifies the implementation.

Proposed API:

ApexCli> savepoint  
ApexCli> launch -savepoint 

first prototype:
https://github.com/sandeshh/apex-core/commit/8ec7e837318c2b33289251cda78ece0024a3f895

Thanks

On Thu, Aug 4, 2016 at 11:54 AM Amol Kekre  wrote:

> hmm! actually it may be a good debugging tool too. Keep the named
> checkpoints around. The feature is to keep checkpoints around, which can be
> done by giving a feature to not delete checkpoints, but then naming them
> makes it more operational. Send a command from cli->get checkpoint -> know
> it is the one you need as the file name has your string you send with the
> command -> debug. This is different that querying a state as this gives
> entire app checkpoint to debug with.
>
> Thks
> Amol
>
>
> On Thu, Aug 4, 2016 at 11:41 AM, Venkatesh Kottapalli <
> venkat...@datatorrent.com> wrote:
>
> > + 1 for the idea.
> >
> > It might be helpful to developers as well when dealing with variety of
> > data in large volumes if this can help them run from the checkpointed
> state
> > rather than rerunning the application altogether in case of issues.
> >
> > I have seen cases where the application runs for more than 10 hours and
> > some partitions fail because of the variety of data that it is dealing
> > with. In such cases, the application has to be restarted and it will be
> > helpful to developers with a feature of this kind.
> >
> >  The ease of enabling/disabling this feature to run the app will also be
> > important.
> >
> > -Venkatesh.
> >
> >
> > > On Aug 4, 2016, at 10:29 AM, Amol Kekre  wrote:
> > >
> > > We had an user who wanted roll-back and restart from audit purposes.
> That
> > > time we did not have timed-window. Names checkpoint would have helped a
> > > little bit..
> > >
> > > Problem statement: Auditors ask for rerun of yesterday's computations
> for
> > > verification. Assume that these computations depend on previous state
> > (i.e
> > > data from day before yesterday).
> > >
> > > Solution
> > > 1. Have named checkpoints at 12 in the night (an input adapter triggers
> > it)
> > > every day
> > > 2. The app spools raw logs into hdfs along with window ids and event
> > times
> > > 3. The re-run is a separate app that starts off on a named checkpoint
> (12
> > > night yesterday)
> > >
> > > Technically the solution will not as simple and "new audit app" will
> > need a
> > > lot of other checks (dedups, drop events not in yesterday's window,
> wait
> > > for late arrivals, ...), but names checkpoint helps.
> > >
> > > I do agree with Pramod's that replay within the same running app is not
> > > viable within a data-in-motion architecture. But it helps somewhat in a
> > new
> > > audit app. Named checkpoints help data-in-motion architectures handle
> > batch
> > > apps better. In the above case #2 spooling done with event time
> > stamp+state
> > > suffices. The state part comes from names checkpoint.
> > >
> > > Thks,
> > > Amol
> > >
> > >
> > >
> > >
> > > On Thu, Aug 4, 2016 at 10:12 AM, Sanjay Pujare  >
> > > wrote:
> > >
> > >> I agree. A specific use-case will be useful to support this feature.
> > Also
> > >> the ability to replay from the named checkpoint will be limited
> because
> > of
> > >> various factors, isn’t it?
> > >>
> > >> On 8/4/16, 9:00 AM, "Pramod Immaneni"  wrote:
> > >>
> > >>There is a problem here, keeping old checkpoints and recovering
> from
> > >> them
> > >>means preserving the old input data along with the state. This is
> > more
> > >> than
> > >>the mechanism of actually creating named checkpoints, it means
> having
> > >> the
> > >>ability for operators to move forward (a.k.a committed and dropping
> > >>committed states and buffer data) while still having the ability to
> > >> replay
> > >>from that point from the input source and providing a way for
> > >> operators (at
> > >>first look input operators) to distinguish that. Why would someone
> > need
> > >>this with idempotent processing? Is there a specific use case you
> are
> > >>looking at? Suppose we go do this, for the mechanism, I would be in
> > >> favor
> > >>of reusing existing tuple.
> > >>
> > >>On Thu, Aug 4, 2016 at 8:44 AM, Vlad Rozov <
> v.ro...@datatorrent.com>
> > >> wrote:
> > >>
> > >>> +1 for the feature. At first look I am more in favor of reusing
> > >> existing
> > >>> control tuple.
> > >>>
> > >>> Thank you,
> > >>>
> > >>> Vlad
> > >>>
> > >>>
> > >>> On 8/4/16 08:17, Sandesh Hegde wrote:
> > >>>
> >  @Chinmay
> >  We can enhance the existing checkpoint tuple but that one is more
> >  frequently used than this feature, 

Re: JDBC Poller implementation

2016-08-08 Thread Devendra Tagare
+ 1 for a common Abstract Class.

0 - for adding ports.

Use-case for CSV - data migration from columnar stores.Some users may want
to move TB's of data from a store like Greenplum and then do
transformations using the CSV parser that is already present.

Thanks,
Dev

On Mon, Aug 8, 2016 at 7:39 AM, Yogi Devendra 
wrote:

> +1 for Tushar's idea for common abstract class.
>
> Additionally, POJO based output should be considered for active
> development. CSV output can be deprecated. Since it can be achieved easily
> using POJO + CSV formatter.
>
> Operators are meant to be lego blocks for reusable functionality to achieve
> higher level functionality by clubbing multiple operators together.
>
>
> ~ Yogi
>
> On 8 August 2016 at 17:45, Tushar Gosavi  wrote:
>
> > I would prefer a common abstract class having emitTuple method. And
> > two different implementation one for emitting comma separated values
> > and other emitting pojo.
> >
> > Regards,
> > -Tushar.
> >
> >
> > On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale
> >  wrote:
> > > The concrete implementation is supposed to do mainly formatting of
> input
> > > data and emit in required form. Also it would be tricky to implement
> some
> > > abstract methods like "getTuple", you have to conditionally return the
> > > values.
> > >
> > > -Priyanka
> > >
> > > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda 
> > wrote:
> > >
> > >> I am +1 for having two ports in the same concrete implementation.
> > >> One port for POJOs and second one for CSV strings (if this is actually
> > >> needed)
> > >>
> > >> +0 on having multiple concrete operators.
> > >> I don't think it is necessary to create multiple concrete
> > implementations
> > >> just because we want the same data in different formats. Ports should
> > serve
> > >> the purpose.
> > >>
> > >> ~ Bhupesh
> > >>
> > >>
> > >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale <
> > priya...@datatorrent.com>
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > JDBCPollerInputOperator in malhar repository emits comma separated
> > list
> > >> of
> > >> > values as result of scan. As most of our input operators emit POJOs
> I
> > am
> > >> > planning to add an implementation which emits pojo.
> > >> >
> > >> > I would like to discuss, if we should have two independent jdbc poll
> > >> input
> > >> > operators, one emits csv and other which emits pojo or we should
> have
> > one
> > >> > operator having two ports?
> > >> >
> > >> > I prefer two operators to define clear intent of each operator, but
> if
> > >> > anyone has different opinion please suggest.
> > >> >
> > >> > -Priyanka
> > >> >
> > >>
> >
>


Re: JDBC Poller implementation

2016-08-08 Thread Yogi Devendra
+1 for Tushar's idea for common abstract class.

Additionally, POJO based output should be considered for active
development. CSV output can be deprecated. Since it can be achieved easily
using POJO + CSV formatter.

Operators are meant to be lego blocks for reusable functionality to achieve
higher level functionality by clubbing multiple operators together.


~ Yogi

On 8 August 2016 at 17:45, Tushar Gosavi  wrote:

> I would prefer a common abstract class having emitTuple method. And
> two different implementation one for emitting comma separated values
> and other emitting pojo.
>
> Regards,
> -Tushar.
>
>
> On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale
>  wrote:
> > The concrete implementation is supposed to do mainly formatting of input
> > data and emit in required form. Also it would be tricky to implement some
> > abstract methods like "getTuple", you have to conditionally return the
> > values.
> >
> > -Priyanka
> >
> > On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda 
> wrote:
> >
> >> I am +1 for having two ports in the same concrete implementation.
> >> One port for POJOs and second one for CSV strings (if this is actually
> >> needed)
> >>
> >> +0 on having multiple concrete operators.
> >> I don't think it is necessary to create multiple concrete
> implementations
> >> just because we want the same data in different formats. Ports should
> serve
> >> the purpose.
> >>
> >> ~ Bhupesh
> >>
> >>
> >> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale <
> priya...@datatorrent.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > JDBCPollerInputOperator in malhar repository emits comma separated
> list
> >> of
> >> > values as result of scan. As most of our input operators emit POJOs I
> am
> >> > planning to add an implementation which emits pojo.
> >> >
> >> > I would like to discuss, if we should have two independent jdbc poll
> >> input
> >> > operators, one emits csv and other which emits pojo or we should have
> one
> >> > operator having two ports?
> >> >
> >> > I prefer two operators to define clear intent of each operator, but if
> >> > anyone has different opinion please suggest.
> >> >
> >> > -Priyanka
> >> >
> >>
>


Re: JDBC Poller implementation

2016-08-08 Thread Tushar Gosavi
I would prefer a common abstract class having emitTuple method. And
two different implementation one for emitting comma separated values
and other emitting pojo.

Regards,
-Tushar.


On Mon, Aug 8, 2016 at 5:34 PM, Priyanka Gugale
 wrote:
> The concrete implementation is supposed to do mainly formatting of input
> data and emit in required form. Also it would be tricky to implement some
> abstract methods like "getTuple", you have to conditionally return the
> values.
>
> -Priyanka
>
> On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda  wrote:
>
>> I am +1 for having two ports in the same concrete implementation.
>> One port for POJOs and second one for CSV strings (if this is actually
>> needed)
>>
>> +0 on having multiple concrete operators.
>> I don't think it is necessary to create multiple concrete implementations
>> just because we want the same data in different formats. Ports should serve
>> the purpose.
>>
>> ~ Bhupesh
>>
>>
>> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale 
>> wrote:
>>
>> > Hi,
>> >
>> > JDBCPollerInputOperator in malhar repository emits comma separated list
>> of
>> > values as result of scan. As most of our input operators emit POJOs I am
>> > planning to add an implementation which emits pojo.
>> >
>> > I would like to discuss, if we should have two independent jdbc poll
>> input
>> > operators, one emits csv and other which emits pojo or we should have one
>> > operator having two ports?
>> >
>> > I prefer two operators to define clear intent of each operator, but if
>> > anyone has different opinion please suggest.
>> >
>> > -Priyanka
>> >
>>


Re: JDBC Poller implementation

2016-08-08 Thread Priyanka Gugale
The concrete implementation is supposed to do mainly formatting of input
data and emit in required form. Also it would be tricky to implement some
abstract methods like "getTuple", you have to conditionally return the
values.

-Priyanka

On Mon, Aug 8, 2016 at 4:49 PM, Bhupesh Chawda  wrote:

> I am +1 for having two ports in the same concrete implementation.
> One port for POJOs and second one for CSV strings (if this is actually
> needed)
>
> +0 on having multiple concrete operators.
> I don't think it is necessary to create multiple concrete implementations
> just because we want the same data in different formats. Ports should serve
> the purpose.
>
> ~ Bhupesh
>
>
> On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale 
> wrote:
>
> > Hi,
> >
> > JDBCPollerInputOperator in malhar repository emits comma separated list
> of
> > values as result of scan. As most of our input operators emit POJOs I am
> > planning to add an implementation which emits pojo.
> >
> > I would like to discuss, if we should have two independent jdbc poll
> input
> > operators, one emits csv and other which emits pojo or we should have one
> > operator having two ports?
> >
> > I prefer two operators to define clear intent of each operator, but if
> > anyone has different opinion please suggest.
> >
> > -Priyanka
> >
>


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411685#comment-15411685
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73859472
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
--- End diff --

Can you mark this class with ```checkpointableWithinAppWindow = false```?
Better to have it in the abstract class.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: JDBC Poller implementation

2016-08-08 Thread Bhupesh Chawda
I am +1 for having two ports in the same concrete implementation.
One port for POJOs and second one for CSV strings (if this is actually
needed)

+0 on having multiple concrete operators.
I don't think it is necessary to create multiple concrete implementations
just because we want the same data in different formats. Ports should serve
the purpose.

~ Bhupesh


On Mon, Aug 8, 2016 at 4:46 PM, Priyanka Gugale 
wrote:

> Hi,
>
> JDBCPollerInputOperator in malhar repository emits comma separated list of
> values as result of scan. As most of our input operators emit POJOs I am
> planning to add an implementation which emits pojo.
>
> I would like to discuss, if we should have two independent jdbc poll input
> operators, one emits csv and other which emits pojo or we should have one
> operator having two ports?
>
> I prefer two operators to define clear intent of each operator, but if
> anyone has different opinion please suggest.
>
> -Priyanka
>


[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...

2016-08-08 Thread chaithu14
Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/360


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #360: APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 r...

2016-08-08 Thread chaithu14
GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/360

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2174-S3-ReaderIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #360


commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96
Author: Chaitanya 
Date:   2016-08-08T06:21:11Z

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2174) S3 File Reader reading more data than expected

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411639#comment-15411639
 ] 

ASF GitHub Bot commented on APEXMALHAR-2174:


Github user chaithu14 closed the pull request at:

https://github.com/apache/apex-malhar/pull/360


> S3 File Reader reading more data than expected
> --
>
> Key: APEXMALHAR-2174
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2174
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> This is observed through the AWS billing.
> Issue might be the S3InputStream.read() which is used in readEntity().
> Reading the block can be achieved through the AmazonS3 api's. So, I am 
> proposing the following solution:
> ```
>   GetObjectRequest rangeObjectRequest = new GetObjectRequest(
>   bucketName, key);
>   rangeObjectRequest.setRange(startByte, noOfBytes);
>   S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
>   S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
>   byte[] record = ByteStreams.toByteArray(wrappedStream);
> Advantages of this solution: Parallel read will work for all types of s3 file 
> systems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2174) S3 File Reader reading more data than expected

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411640#comment-15411640
 ] 

ASF GitHub Bot commented on APEXMALHAR-2174:


GitHub user chaithu14 reopened a pull request:

https://github.com/apache/apex-malhar/pull/360

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chaithu14/incubator-apex-malhar 
APEXMALHAR-2174-S3-ReaderIssue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #360


commit 0c70e92e6f2a1a631569d6b9608ac79de0a50b96
Author: Chaitanya 
Date:   2016-08-08T06:21:11Z

APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue




> S3 File Reader reading more data than expected
> --
>
> Key: APEXMALHAR-2174
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2174
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> This is observed through the AWS billing.
> Issue might be the S3InputStream.read() which is used in readEntity().
> Reading the block can be achieved through the AmazonS3 api's. So, I am 
> proposing the following solution:
> ```
>   GetObjectRequest rangeObjectRequest = new GetObjectRequest(
>   bucketName, key);
>   rangeObjectRequest.setRange(startByte, noOfBytes);
>   S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
>   S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
>   byte[] record = ByteStreams.toByteArray(wrappedStream);
> Advantages of this solution: Parallel read will work for all types of s3 file 
> systems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #359: APEXMALHAR-2161: Add tests for AbstractThroug...

2016-08-08 Thread yogidevendra
Github user yogidevendra commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/359#discussion_r73849743
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperatorTest.java
 ---
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener.BatchedOperatorStats;
+import com.datatorrent.api.StatsListener.Response;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileCounters;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class AbstractThroughputFileInputOperatorTest
+{
+  private AbstractThroughputFileInputOperator underTest;
+  @Mock
+  private Partition mockPartition;
+  @Mock
+  private BatchedOperatorStats mockBatchStats;
+  @Mock
+  private OperatorStats mockOperatorStats;
+  @Mock
+  private BasicCounters fileCountersMock;
+  @Mock
+  private MutableLong fileCounterMock;
+
+  @Before
+  public void setup()
+  {
+underTest = new ThroughputFileInputOperator();
+MockitoAnnotations.initMocks(this);
+
+when(mockPartition.getPartitionedInstance()).thenReturn(underTest);
+  }
+
+  @Test
+  public void testInitialPartitioning()
+  {
+underTest.setPartitionCount(4);
+underTest.setPreferredMaxPendingFilesPerOperator(6);
+
+for (int i = 0; i < 74; i++) {
+  underTest.pendingFiles.add("file-" + i);
+}
+
+int partitioncount = 
underTest.getNewPartitionCount(Collections.singleton(mockPartition), null);
+Assert.assertEquals(4, partitioncount);
+  }
+
+  @Test
+  public void testProcessStats() throws Exception
+  {
+underTest.setPartitionCount(4);
+underTest.setPreferredMaxPendingFilesPerOperator(10);
+
+for (int i = 0; i < 21; i++) {
+  underTest.pendingFiles.add("file-" + i);
+}
+
+mockOperatorStats.counters = fileCountersMock;
+when(mockPartition.getStats()).thenReturn(mockBatchStats);
+
when(mockBatchStats.getLastWindowedStats()).thenReturn(Collections.singletonList(mockOperatorStats));
+
when(fileCountersMock.getCounter(any(FileCounters.class))).thenReturn(fileCounterMock);
+when(fileCounterMock.getValue()).thenReturn(20L);
+
+Response response = underTest.processStats(mockBatchStats);
+
+Assert.assertTrue(response.repartitionRequired);
+  }
+
+  @Test
+  public void testRepartitioning()
+  {
+underTest.setPartitionCount(4);
--- End diff --

Please add more scenarios for repartitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2161) Add tests for AbstractThroughputFileInputOperator

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411604#comment-15411604
 ] 

ASF GitHub Bot commented on APEXMALHAR-2161:


Github user yogidevendra commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/359#discussion_r73849553
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/io/fs/AbstractThroughputFileInputOperatorTest.java
 ---
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener.BatchedOperatorStats;
+import com.datatorrent.api.StatsListener.Response;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileCounters;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class AbstractThroughputFileInputOperatorTest
+{
+  private AbstractThroughputFileInputOperator underTest;
+  @Mock
+  private Partition mockPartition;
+  @Mock
+  private BatchedOperatorStats mockBatchStats;
+  @Mock
+  private OperatorStats mockOperatorStats;
+  @Mock
+  private BasicCounters fileCountersMock;
+  @Mock
+  private MutableLong fileCounterMock;
+
+  @Before
+  public void setup()
+  {
+underTest = new ThroughputFileInputOperator();
+MockitoAnnotations.initMocks(this);
+
+when(mockPartition.getPartitionedInstance()).thenReturn(underTest);
+  }
+
+  @Test
+  public void testInitialPartitioning()
+  {
+underTest.setPartitionCount(4);
+underTest.setPreferredMaxPendingFilesPerOperator(6);
+
+for (int i = 0; i < 74; i++) {
+  underTest.pendingFiles.add("file-" + i);
+}
+
+int partitioncount = 
underTest.getNewPartitionCount(Collections.singleton(mockPartition), null);
+Assert.assertEquals(4, partitioncount);
+  }
+
+  @Test
+  public void testProcessStats() throws Exception
+  {
+underTest.setPartitionCount(4);
+underTest.setPreferredMaxPendingFilesPerOperator(10);
+
+for (int i = 0; i < 21; i++) {
+  underTest.pendingFiles.add("file-" + i);
+}
+
+mockOperatorStats.counters = fileCountersMock;
+when(mockPartition.getStats()).thenReturn(mockBatchStats);
+
when(mockBatchStats.getLastWindowedStats()).thenReturn(Collections.singletonList(mockOperatorStats));
+
when(fileCountersMock.getCounter(any(FileCounters.class))).thenReturn(fileCounterMock);
+when(fileCounterMock.getValue()).thenReturn(20L);
+
+Response response = underTest.processStats(mockBatchStats);
+
+Assert.assertTrue(response.repartitionRequired);
--- End diff --

Could you please add test scenario for repartitionRequired=false.


> Add tests for AbstractThroughputFileInputOperator
> -
>
> Key: APEXMALHAR-2161
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2161
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>Priority: Minor
>   Original Estimate: 24h
>  Time Spent: 8h
>  Remaining Estimate: 16h
>
> Add unit tests for AbstractThroughputFileInputOperator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411600#comment-15411600
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73848852
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
--- End diff --

removed this and couple of other lines :)


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411598#comment-15411598
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73848661
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

calling disconnect for non-poller threads as they are not going to connect 
to db again, also udpated exception handling.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73848661
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

calling disconnect for non-poller threads as they are not going to connect 
to db again, also udpated exception handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411578#comment-15411578
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73845347
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
--- End diff --

The thread execution is not bound to window. And even if the pollInterval 
is short and there are multiple calls to poll record, as it's a single thread, 
the subsequent calls will block till this one is done. And the isPolled will be 
"true" for consequent calls of non-poller threads.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411558#comment-15411558
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844259
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient Map, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411565#comment-15411565
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844320
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+ 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411556#comment-15411556
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844231
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
--- End diff --

Ok


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411569#comment-15411569
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844368
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue implements 
Spillable.SpillableByteArrayListMultimap
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isKeyContainsMultiValue)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer 
== null) {
+  return null;
+}
+if (isKeyContainsMultiValue) {
+  return (List)streamCodec.fromByteArray(valueSlice);
+}
+value = new ArrayList<>();
+value.add((V)streamCodec.fromByteArray(valueSlice));
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411560#comment-15411560
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844285
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
--- End diff --

Done


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411557#comment-15411557
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844252
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient Map, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411570#comment-15411570
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844379
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue implements 
Spillable.SpillableByteArrayListMultimap
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isKeyContainsMultiValue)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer 
== null) {
+  return null;
+}
+if (isKeyContainsMultiValue) {
+  return (List)streamCodec.fromByteArray(valueSlice);
+}
+value = new ArrayList<>();
+value.add((V)streamCodec.fromByteArray(valueSlice));
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411567#comment-15411567
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844336
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411563#comment-15411563
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844300
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+ 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844379
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ManagedTimeStateMultiValue implements 
Spillable.SpillableByteArrayListMultimap
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isKeyContainsMultiValue)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isKeyContainsMultiValue;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer 
== null) {
+  return null;
+}
+if (isKeyContainsMultiValue) {
+  return (List)streamCodec.fromByteArray(valueSlice);
+}
+value = new ArrayList<>();
+value.add((V)streamCodec.fromByteArray(valueSlice));
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
+  public int size()
+  {
+throw new 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844212
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+if (!store.put(key, 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844311
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844328
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844275
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/InnerJoinStreamCodec.java ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Stream codec based on keyExpression for POJO Inner Join Operator.
+ *
+ * @Since 3.5.0
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844228
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844300
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844285
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844252
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient Map, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
+  
stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+  
stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime));
+}
+
stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime()));
+
+component = new ManagedSpillableComplexComponent();
+stream1Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream1Store,
 !isStream1KeyPrimary());
+stream2Data = 
((ManagedSpillableComplexComponent)component).newSpillableByteArrayListMultimap(stream2Store,
 !isStream2KeyPrimary());
+  }
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key in asynchronous if found it in opposite 
store
+   * 4) If the future is done then Merge the given tuple and values found 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411555#comment-15411555
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73844228
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
--- End diff --

Done


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411548#comment-15411548
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73843385
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
--- End diff --

changing size to 4k


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73843385
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
--- End diff --

changing size to 4k


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73843244
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
-spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(1);
+}
 execute = true;
-cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   } else {
 ps = store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-isPollable = true;
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   }
-
 } catch (SQLException e) {
   LOG.error("Exception in initializing the range query for a given 
partition", e);
   throw new RuntimeException(e);
 }
 
-windowManager.setup(context);
-LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
 currentWindowId = windowId;
-
-isReplayed = false;
-
 if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
   try {
 replay(currentWindowId);
+return;
   } catch (SQLException e) {
 LOG.error("Exception in replayed windows", e);
 throw new RuntimeException(e);
   }
 }
-
-if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
-  try {
-if (!isPollable && rangeQueryPair.getValue() != null) {
-
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
-  rangeQueryPair.getValue()),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-} else {
-  String bound = null;
-  if (previousUpperBound == null) {
-bound = getRangeQueryPair().getKey();
-  } else {
-bound = previousUpperBound;
-  }
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-  isPollable = true;
-}
-isReplayed = false;
-LOG.debug("Prepared statement after re-initialization - {} ", 
ps.toString());
-  } catch (SQLException e) {
- 

[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411544#comment-15411544
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73842885
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
--- End diff --

Use ```offer``` instead, else we might get an ```IllegalStateException```


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73842885
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
--- End diff --

Use ```offer``` instead, else we might get an ```IllegalStateException```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411531#comment-15411531
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841819
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
--- End diff --

can be removed?


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841819
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
--- End diff --

can be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841763
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+LOG.info("Query formed to recover data - {}", ps.toString());
 
-jdbcPoller.setRangeQueryPair(new KeyValPair(recoveredData.left, recoveredData.right));
+emitReplayedTuples(ps);
 
-jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), 
jdbcPoller.getKey(),
-jdbcPoller.getRangeQueryPair().getKey(), 
jdbcPoller.getRangeQueryPair().getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-

[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411521#comment-15411521
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841414
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --

right, would update.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update 

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread DT-Priyanka
Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841414
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --

right, would update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411515#comment-15411515
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user DT-Priyanka commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73841049
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
   public void setup(OperatorContext context)
   {
 super.setup(context);
-spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+intializeDSLContext();
+if (scanService == null) {
+  scanService = Executors.newScheduledThreadPool(1);
+}
 execute = true;
-cause = new AtomicReference();
-emitQueue = new ArrayBlockingQueue(queueCapacity);
-this.context = context;
+emitQueue = new LinkedBlockingDeque<>(queueCapacity);
 operatorId = context.getId();
+windowManager.setup(context);
+  }
 
-try {
+  private void intializeDSLContext()
+  {
+create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+  }
 
-  //If its a range query pass upper and lower bounds
-  //If its a polling query pass only the lower bound
-  if (getRangeQueryPair().getValue() != null) {
-ps = store.getConnection()
-.prepareStatement(
-JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
-rangeQueryPair.getValue()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
+  @Override
+  public void activate(OperatorContext context)
+  {
+initializePreparedStatement();
+long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+if (largestRecoveryWindow == Stateless.WINDOW_ID
+|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
+  scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+}
+  }
+
+  protected void initializePreparedStatement()
+  {
+try {
+  // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
+  if (isPollerPartition) {
+ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   } else {
 ps = store.getConnection().prepareStatement(
-JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
-java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-isPollable = true;
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
   }
-
 } catch (SQLException e) {
   LOG.error("Exception in initializing the range query for a given 
partition", e);
   throw new RuntimeException(e);
 }
 
-windowManager.setup(context);
-LOG.debug("super setup done...");
   }
 
   @Override
   public void beginWindow(long windowId)
   {
 currentWindowId = windowId;
-
-isReplayed = false;
-
 if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
   try {
 replay(currentWindowId);
+return;
   } catch (SQLException e) {
 LOG.error("Exception in replayed windows", e);
 throw new RuntimeException(e);
   }
 }
-
-if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
-  try {
-if (!isPollable && rangeQueryPair.getValue() != null) {
-
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
-  rangeQueryPair.getValue()),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
-} else {
-  String bound = null;
-  if (previousUpperBound == null) {
-bound = getRangeQueryPair().getKey();
-  } else {
-bound = previousUpperBound;
-  }
-  ps = store.getConnection().prepareStatement(
-  JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
-  java.sql.ResultSet.TYPE_FORWARD_ONLY, 

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73840984
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

This will be called even in case of no exceptions and create problems.
We should let the exception reach the operator thread. Otherwise this might 
never get lost in the thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411512#comment-15411512
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73840984
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
--- End diff --

This will be called even in case of no exceptions and create problems.
We should let the exception reach the operator thread. Otherwise this might 
never get lost in the thread.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73840151
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
--- End diff --

Does this mean that we want to poll records just once in a window?
Is this because the poll interval is sufficiently long so as to prevent 
multiple calls to ```pollRecords()``` within a single window?
Even in that case, if the poll interval is reduced, there might be multiple 
calls where only one call will happen in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411480#comment-15411480
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73838593
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
+LOG.info("Query formed to recover data - {}", ps.toString());
 
-jdbcPoller.setRangeQueryPair(new KeyValPair(recoveredData.left, recoveredData.right));
+emitReplayedTuples(ps);
 
-jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
-

[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73838062
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
--- End diff --

exactlyonce => idempotent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411473#comment-15411473
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73838062
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
--- End diff --

exactlyonce => idempotent


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73830019
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -438,119 +261,110 @@ public void endWindow()
 currentWindowRecoveryState = new MutablePair<>();
   }
 
-  public int getPartitionCount()
-  {
-return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
+  @Override
+  public void deactivate()
   {
-this.partitionCount = partitionCount;
+scanService.shutdownNow();
+store.disconnect();
   }
 
-  @Override
-  public void activate(Context cntxt)
+  protected void pollRecords()
   {
-if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
-&& context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
-  // If it is a replay state, don't start any threads here
+if (isPolled) {
   return;
 }
-  }
-
-  @Override
-  public void deactivate()
-  {
 try {
-  if (dbPoller != null && dbPoller.isAlive()) {
-dbPoller.interrupt();
-dbPoller.join();
+  ps.setFetchSize(getFetchSize());
+  ResultSet result = ps.executeQuery();
+  if (result.next()) {
+do {
+  emitQueue.add(getTuple(result));
+} while (result.next());
   }
-} catch (InterruptedException ex) {
-  // log and ignore, ending execution anyway
-  LOG.error("exception in poller thread: ", ex);
+  isPolled = true;
+} catch (SQLException ex) {
+  throw new RuntimeException(String.format("Error while running 
query"), ex);
+} finally {
+  store.disconnect();
 }
-  }
 
-  @Override
-  public void handleIdleTime()
-  {
-if (execute) {
-  try {
-Thread.sleep(spinMillis);
-  } catch (InterruptedException ie) {
-throw new RuntimeException(ie);
-  }
-} else {
-  LOG.error("Exception: ", cause);
-  DTThrowable.rethrow(cause.get());
-}
   }
 
+  public abstract T getTuple(ResultSet result);
+
   protected void replay(long windowId) throws SQLException
   {
-isReplayed = true;
 
-MutablePair recoveredData = new MutablePair();
 try {
-  recoveredData = (MutablePair)windowManager.load(operatorId, windowId);
+  MutablePair recoveredData = (MutablePair)windowManager.load(operatorId,
+  windowId);
 
-  if (recoveredData != null) {
-//skip the window and return if there was no incoming data in the 
window
-if (recoveredData.left == null || recoveredData.right == null) {
-  return;
-}
-
-if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
-  LOG.info("Matched so returning");
-  return;
-}
+  if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
+recoveredData.right);
 
-JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
-jdbcPoller.setStore(store);
-jdbcPoller.setKey(getKey());
-jdbcPoller.setPartitionCount(getPartitionCount());
-jdbcPoller.setPollInterval(getPollInterval());
-jdbcPoller.setTableName(getTableName());
-jdbcPoller.setBatchSize(getBatchSize());
-isPollable = false;
-
-LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
+ps = store.getConnection().prepareStatement(
+buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
--- End diff --

Should this fetch data from ```recoveredData.left``` to 
```recoveredData.right```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-08 Thread Chinmay Kolhatkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chinmay Kolhatkar resolved APEXMALHAR-1701.
---
   Resolution: Done
Fix Version/s: 3.5.0

PR for deduper with managed state merged.

> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Bhupesh Chawda
> Fix For: 3.5.0
>
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411406#comment-15411406
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73828487
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
+  private static int DEFAULT_BATCH_SIZE = 2000;
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
-  protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
-  protected int batchSize;
-  protected static int fetchSize = 2;
-  /**
-   * Map of windowId to  of the range key
-   */
-  protected transient MutablePair 
currentWindowRecoveryState;
-
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String columnsExpression;
+  @NotNull
+  private String key;
+  private String whereCondition = null;
+  private long currentWindowId;
+  private WindowDataManager windowManager;
+
+  protected KeyValPair rangeQueryPair;
+  protected Integer lowerBound;
+  private transient int operatorId;
+  private transient DSLContext create;
   private transient volatile boolean execute;
-  private transient AtomicReference cause;
-  protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
-  protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue emitQueue;
+  private transient ScheduledExecutorService scanService;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
--- End diff --

```upperBound```? Seems to be more intuitive when looking at the rest of 
the code.


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...

2016-08-08 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73828487
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -80,201 +91,45 @@
  * @tags database, sql, jdbc, partitionable,exactlyOnce
  */
 @Evolving
-public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator
-implements ActivationListener, IdleTimeHandler, 
Partitioner
+public abstract class AbstractJdbcPollInputOperator extends 
AbstractStoreInputOperator implements
+ActivationListener, 
Partitioner
 {
-  /**
-   * poll interval in milliseconds
-   */
-  private static int pollInterval = 1;
+  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
+  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
+  private static int DEFAULT_FETCH_SIZE = 2;
+  private static int DEFAULT_BATCH_SIZE = 2000;
+  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
+  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
+  private int fetchSize = DEFAULT_FETCH_SIZE;
 
   @Min(1)
   private int partitionCount = 1;
-  protected transient int operatorId;
-  protected transient boolean isReplayed;
-  protected transient boolean isPollable;
-  protected int batchSize;
-  protected static int fetchSize = 2;
-  /**
-   * Map of windowId to  of the range key
-   */
-  protected transient MutablePair 
currentWindowRecoveryState;
-
-  /**
-   * size of the emit queue used to hold polled records before emit
-   */
-  private static int queueCapacity = 4 * 1024 * 1024;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String columnsExpression;
+  @NotNull
+  private String key;
+  private String whereCondition = null;
+  private long currentWindowId;
+  private WindowDataManager windowManager;
+
+  protected KeyValPair rangeQueryPair;
+  protected Integer lowerBound;
+  private transient int operatorId;
+  private transient DSLContext create;
   private transient volatile boolean execute;
-  private transient AtomicReference cause;
-  protected transient int spinMillis;
-  private transient OperatorContext context;
-  protected String tableName;
-  protected String key;
-  protected long currentWindowId;
-  protected KeyValPair rangeQueryPair;
-  protected String lower;
-  protected String upper;
-  protected boolean recovered;
-  protected boolean isPolled;
-  protected String whereCondition = null;
-  protected String previousUpperBound;
-  protected String highestPolled;
-  private static final String user = "";
-  private static final String password = "";
-  /**
-   * thread to poll database
-   */
-  private transient Thread dbPoller;
-  protected transient ArrayBlockingQueue emitQueue;
+  private transient ScheduledExecutorService scanService;
+  protected transient boolean isPolled;
+  protected transient Integer lastPolledBound;
--- End diff --

```upperBound```? Seems to be more intuitive when looking at the rest of 
the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-08 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73828318
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411403#comment-15411403
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73828318
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected transient Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given 

[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy

2016-08-08 Thread Siyuan Hua (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411388#comment-15411388
 ] 

Siyuan Hua commented on APEXMALHAR-2169:


[~chaithu]  
Then I think the problem is in softConstraint and hardConstraint code, it 
should never return true because default limit is Long.MAX_VALUE.  

There is something in backlog that I didn't track in Jira(my bad). But since 
you have issue here, can you please do some refactor here. 
We want to actually simplify the operator code instead of making it more and 
more complicate. And kafka input operator is there for awhile and I don't see 
any requirement/asking for dynamic partition based on throughput.
Can we take away the hardConstraint and softConstraint condition check and make 
the 2 upperbound property deprecated.  So dynamic partition by default should 
only happen when kafka partition changes. 
And for ONE_TO_MANY partition strategy, the number of operator partitions 
should stay unchanged for the whole application with the specified 
initialPartitionCount. I think there is still bug there that if new kafka 
partition is added, we always start a new partition. That is not correct.

And you can create another ticket to move all repartition based on throughput 
to a separate Partitioner so the operator code would be simple and easy to 
understand/debug



> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> --
>
> Key: APEXMALHAR-2169
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>strategy : one_to_many
>initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411374#comment-15411374
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/358#discussion_r73826686
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
@@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps)
*/
   @Override
   public 
Collection>
 definePartitions(
-  
Collection>
 partitions,
-  com.datatorrent.api.Partitioner.PartitioningContext context)
+  Collection> partitions, 
PartitioningContext context)
   {
 List> newPartitions = new 
ArrayList>(
 getPartitionCount());
-JdbcStore jdbcStore = new JdbcStore();
-jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
-jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
-jdbcStore.setConnectionProperties(store.getConnectionProperties());
 
-jdbcStore.connect();
-
-HashMap> partitionToRangeMap = 
null;
+HashMap> partitionToRangeMap = 
null;
 try {
-  partitionToRangeMap = 
JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
-  jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), 
getTableName(), getKey(),
-  store.getConnectionProperties().getProperty(user), 
store.getConnectionProperties().getProperty(password),
-  whereCondition, emitColumnList);
+  store.connect();
+  intializeDSLContext();
+  partitionToRangeMap = 
getPartitionedQueryRangeMap(getPartitionCount());
 } catch (SQLException e) {
   LOG.error("Exception in initializing the partition range", e);
+  throw new RuntimeException(e);
+} finally {
+  store.disconnect();
 }
 
 KryoCloneUtils cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
 
+// The n given partitions are for range queries and n + 1 partition is 
for polling query
 for (int i = 0; i <= getPartitionCount(); i++) {
-  AbstractJdbcPollInputOperator jdbcPoller = null;
-
-  jdbcPoller = cloneUtils.getClone();
-
-  jdbcPoller.setStore(store);
-  jdbcPoller.setKey(getKey());
-  jdbcPoller.setPartitionCount(getPartitionCount());
-  jdbcPoller.setPollInterval(getPollInterval());
-  jdbcPoller.setTableName(getTableName());
-  jdbcPoller.setBatchSize(getBatchSize());
-  jdbcPoller.setEmitColumnList(getEmitColumnList());
-
-  store.connect();
-  //The n given partitions are for range queries and n + 1 partition 
is for polling query
-  //The upper bound for the n+1 partition is set to null since its a 
pollable partition
+  AbstractJdbcPollInputOperator jdbcPoller = cloneUtils.getClone();
   if (i < getPartitionCount()) {
-jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
-isPollable = false;
+jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
+jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey();
--- End diff --

Set ```jdbcPoller.isPollerPartition = false``` here?


> Update JDBC poll input operator to fix issues
> -
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)