[GitHub] flink pull request #2495: FLINK-3322 - Make sorters to reuse the memory page...

2016-09-12 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2495

FLINK-3322 - Make sorters to reuse the memory pages allocated for iterative 
tasks

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This is part1 for FLINK-3322 where only the Sorters are made to reuse the 
memory pages. As @ggevay  pointed out we have to handle the iterators also 
where the memory pages are allocated. I have a seperate PR for that because 
that involves touching lot of places. But am open to feedback here. It is fine 
with me to combine both also but it was making the changes much bigger. 
I would like to get the feed back here on this apporach. 
Here a SorterMemoryAllocator is now passed to the UnilateralSortMergers. 
That will allocate the required memory pages and it will allocate the required 
read, write and large buffers. As per the existing logic the buffers will be 
released. But if the task is an iterative task we wait for the tasks to be 
released until a close or termination call happens for the iterative task. 
In case of pages that were grabbed in between for keysort or record sort 
those will be put back to the respective pages so that we have the required 
number of pages through out the life cycle of the iterative task.

As said this is only part 1. We need to address the iterators also. But 
that according to me touches more places. I have done the changes for that but 
it is not in a shape to be pushed as a PR but am open to feed back here. Thanks 
all. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3322_part1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2495


commit 705ee5294bc5263971c2924a55c9230d72806527
Author: Ramkrishna 
Date:   2016-09-13T06:33:59Z

FLINK-3322 - Make sorters to reuse the memory pages allocated for
iterative tasks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486449#comment-15486449
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2495

FLINK-3322 - Make sorters to reuse the memory pages allocated for iterative 
tasks

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This is part1 for FLINK-3322 where only the Sorters are made to reuse the 
memory pages. As @ggevay  pointed out we have to handle the iterators also 
where the memory pages are allocated. I have a seperate PR for that because 
that involves touching lot of places. But am open to feedback here. It is fine 
with me to combine both also but it was making the changes much bigger. 
I would like to get the feed back here on this apporach. 
Here a SorterMemoryAllocator is now passed to the UnilateralSortMergers. 
That will allocate the required memory pages and it will allocate the required 
read, write and large buffers. As per the existing logic the buffers will be 
released. But if the task is an iterative task we wait for the tasks to be 
released until a close or termination call happens for the iterative task. 
In case of pages that were grabbed in between for keysort or record sort 
those will be put back to the respective pages so that we have the required 
number of pages through out the life cycle of the iterative task.

As said this is only part 1. We need to address the iterators also. But 
that according to me touches more places. I have done the changes for that but 
it is not in a shape to be pushed as a PR but am open to feed back here. Thanks 
all. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3322_part1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2495


commit 705ee5294bc5263971c2924a55c9230d72806527
Author: Ramkrishna 
Date:   2016-09-13T06:33:59Z

FLINK-3322 - Make sorters to reuse the memory pages allocated for
iterative tasks




> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486214#comment-15486214
 ] 

ASF GitHub Bot commented on FLINK-4513:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2493
  
@uce, @zentol  do you have some time to help review?


> Kafka connector documentation refers to Flink 1.1-SNAPSHOT
> --
>
> Key: FLINK-4513
> URL: https://issues.apache.org/jira/browse/FLINK-4513
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
>Reporter: Fabian Hueske
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2493: [FLINK-4513] [Docs]: Kafka connector documentation refers...

2016-09-12 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2493
  
@uce, @zentol  do you have some time to help review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2494: [FLINK-4614][Docs]: Changed the version from 1.2-SNAPSHOT...

2016-09-12 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2494
  
@uce, @zentol  do you have some time to help review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4614) Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486213#comment-15486213
 ] 

ASF GitHub Bot commented on FLINK-4614:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2494
  
@uce, @zentol  do you have some time to help review?


> Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.2.0
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486023#comment-15486023
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
@mushketyk thanks very much for reviewing, will fix as required soon.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2016-09-12 Thread haoch
Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
@mushketyk thanks very much for reviewing, will fix as required soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485757#comment-15485757
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478766
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

I am planing update this PR based on your comment and then ping @rmetzger 
for further review.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478766
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

I am planing update this PR based on your comment and then ping @rmetzger 
for further review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485747#comment-15485747
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478455
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , I was trying to and connect() method as you suggested and 
to explicitly test if table exists as following:
public void connect(String tableName) throws IOException {
connection = ConnectionFactory.createConnection(hbConfig);
TableName name = TableName.valueOf(tableName);
try (Admin admin = connection.getAdmin()) {
if (!admin.tableExists(name)) {
throw new RuntimeException("Table " + tableName 
+ " doesn't exist!");
}
}
table = connection.getTable(name);
}
But once the method admin.tableExists(name) is added, running my example 
will throw exception: 
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family 
table does not exist in region hbase:meta,,1.1588230740 in table 'hbase:meta'
I've never run into this exception in my project that uses hbase client. Do 
you know the reason for this exception?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78478455
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , I was trying to and connect() method as you suggested and 
to explicitly test if table exists as following:
public void connect(String tableName) throws IOException {
connection = ConnectionFactory.createConnection(hbConfig);
TableName name = TableName.valueOf(tableName);
try (Admin admin = connection.getAdmin()) {
if (!admin.tableExists(name)) {
throw new RuntimeException("Table " + tableName 
+ " doesn't exist!");
}
}
table = connection.getTable(name);
}
But once the method admin.tableExists(name) is added, running my example 
will throw exception: 
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family 
table does not exist in region hbase:meta,,1.1588230740 in table 'hbase:meta'
I've never run into this exception in my project that uses hbase client. Do 
you know the reason for this exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4614) Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485552#comment-15485552
 ] 

ASF GitHub Bot commented on FLINK-4614:
---

GitHub user nssalian opened a pull request:

https://github.com/apache/flink/pull/2494

[FLINK-4614][Docs]: Changed the version from 1.2-SNAPSHOT to 1.2 in 
docs/_config.yml

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.


- [ ] Documentation
  - Modified the _config.yml file to point to 1.2 instead of 1.2-SNAPSHOT

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

…ect docs in 1.2.0

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nssalian/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2494


commit 65b3ff78f100ed1b13ec2fcc727f4869823b5918
Author: Neelesh Srinivas Salian 
Date:   2016-09-12T22:38:06Z

FLINK-4614: Changed the version from 1.2-SNAPSHOT to 1.2 to help correct 
docs in 1.2.0




> Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.2.0
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2494: [FLINK-4614][Docs]: Changed the version from 1.2-S...

2016-09-12 Thread nssalian
GitHub user nssalian opened a pull request:

https://github.com/apache/flink/pull/2494

[FLINK-4614][Docs]: Changed the version from 1.2-SNAPSHOT to 1.2 in 
docs/_config.yml

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.


- [ ] Documentation
  - Modified the _config.yml file to point to 1.2 instead of 1.2-SNAPSHOT

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

…ect docs in 1.2.0

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nssalian/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2494


commit 65b3ff78f100ed1b13ec2fcc727f4869823b5918
Author: Neelesh Srinivas Salian 
Date:   2016-09-12T22:38:06Z

FLINK-4614: Changed the version from 1.2-SNAPSHOT to 1.2 to help correct 
docs in 1.2.0




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4614) Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian updated FLINK-4614:
---
Fix Version/s: (was: 1.1.3)
   1.2.0

> Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.2.0
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4614) Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian updated FLINK-4614:
---
Summary: Kafka connector documentation refers to Flink 1.2-SNAPSHOT  (was: 
CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT)

> Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4614) CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian updated FLINK-4614:
---
Affects Version/s: (was: 1.1.1)
   1.2.0

> CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-2765:
--
Description: 
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}

Stable release for hadoop-2 is 1.1.x line
We should upgrade to 1.2.1

  was:
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}
Stable release for hadoop-2 is 1.1.x line
We should upgrade to 1.2.1


> Upgrade hbase version for hadoop-2 to 1.2 release
> -
>
> Key: FLINK-2765
> URL: https://issues.apache.org/jira/browse/FLINK-2765
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently 0.98.11 is used:
> {code}
> 0.98.11-hadoop2
> {code}
> Stable release for hadoop-2 is 1.1.x line
> We should upgrade to 1.2.1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4533) Unprotected access to meters in StatsDReporter#report()

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4533:
--
Description: 
Access to meters in AbstractReporter is protected by AbstractReporter.this.

Access to meters in StatsDReporter#report() should do the same.

  was:
Access to meters in AbstractReporter is protected by AbstractReporter.this.


Access to meters in StatsDReporter#report() should do the same.


> Unprotected access to meters in StatsDReporter#report()
> ---
>
> Key: FLINK-4533
> URL: https://issues.apache.org/jira/browse/FLINK-4533
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Access to meters in AbstractReporter is protected by AbstractReporter.this.
> Access to meters in StatsDReporter#report() should do the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry> entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry> entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}

w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3801:
--
Description: 
Currently yoda-time 2.5 is used which was very old.


We should upgrade to 2.9.3

  was:
Currently yoda-time 2.5 is used which was very old.

We should upgrade to 2.9.3


> Upgrade Joda-Time library to 2.9.3
> --
>
> Key: FLINK-3801
> URL: https://issues.apache.org/jira/browse/FLINK-3801
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently yoda-time 2.5 is used which was very old.
> We should upgrade to 2.9.3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3222:
--
Description: 
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.

The shift amount is greater than 31 bits.

  was:
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.


The shift amount is greater than 31 bits.


> Incorrect shift amount in OperatorCheckpointStats#hashCode()
> 
>
> Key: FLINK-3222
> URL: https://issues.apache.org/jira/browse/FLINK-3222
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
> >>> 32));
> {code}
> subTaskStats.length is an int.
> The shift amount is greater than 31 bits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4573:
--
Description: 
{code}
try {
raf = new 
RandomAccessFile(file, "r");
} catch (FileNotFoundException 
e) {
display(ctx, request, 
"Displaying TaskManager log failed.");
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
long fileLength = raf.length();
final FileChannel fc = 
raf.getChannel();
{code}

If length() throws IOException, raf would be left unclosed.

  was:
{code}
try {
raf = new 
RandomAccessFile(file, "r");
} catch (FileNotFoundException 
e) {
display(ctx, request, 
"Displaying TaskManager log failed.");
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
long fileLength = raf.length();
final FileChannel fc = 
raf.getChannel();
{code}
If length() throws IOException, raf would be left unclosed.


> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3734:
--
Description: 
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}

DataInputView in is not closed upon return.

  was:
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}
DataInputView in is not closed upon return.


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4614) CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485380#comment-15485380
 ] 

Neelesh Srinivas Salian commented on FLINK-4614:


See this prevalent in the latest master branch as well.
https://github.com/apache/flink/blob/master/docs/_config.yml

> CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT
> --
>
> Key: FLINK-4614
> URL: https://issues.apache.org/jira/browse/FLINK-4614
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4614) CLONE - Kafka connector documentation refers to Flink 1.2-SNAPSHOT

2016-09-12 Thread Neelesh Srinivas Salian (JIRA)
Neelesh Srinivas Salian created FLINK-4614:
--

 Summary: CLONE - Kafka connector documentation refers to Flink 
1.2-SNAPSHOT
 Key: FLINK-4614
 URL: https://issues.apache.org/jira/browse/FLINK-4614
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.1
Reporter: Neelesh Srinivas Salian
Assignee: Neelesh Srinivas Salian
Priority: Trivial
 Fix For: 1.1.3


The Kafka connector documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
 of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2493: [FLINK-4513] [Docs]: Kafka connector documentation...

2016-09-12 Thread nssalian
GitHub user nssalian opened a pull request:

https://github.com/apache/flink/pull/2493

[FLINK-4513] [Docs]: Kafka connector documentation refers to Flink 
1.1-SNAPSHOT

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.



- [ ] Documentation
 Modified the _config.yml

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

… of 1.1-SNAPSHOT

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nssalian/flink release-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2493


commit 1c22a2d1de16eea08c3cbc2d4390ee27abf234ae
Author: Neelesh Srinivas Salian 
Date:   2016-09-12T21:35:00Z

FLINK-4513: Changing the docs/_config.yml to the 1.1 release instread of 
1.1-SNAPSHOT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485360#comment-15485360
 ] 

ASF GitHub Bot commented on FLINK-4513:
---

GitHub user nssalian opened a pull request:

https://github.com/apache/flink/pull/2493

[FLINK-4513] [Docs]: Kafka connector documentation refers to Flink 
1.1-SNAPSHOT

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.



- [ ] Documentation
 Modified the _config.yml

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

… of 1.1-SNAPSHOT

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nssalian/flink release-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2493


commit 1c22a2d1de16eea08c3cbc2d4390ee27abf234ae
Author: Neelesh Srinivas Salian 
Date:   2016-09-12T21:35:00Z

FLINK-4513: Changing the docs/_config.yml to the 1.1 release instread of 
1.1-SNAPSHOT




> Kafka connector documentation refers to Flink 1.1-SNAPSHOT
> --
>
> Key: FLINK-4513
> URL: https://issues.apache.org/jira/browse/FLINK-4513
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
>Reporter: Fabian Hueske
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3370) Add an aligned version of the window operator

2016-09-12 Thread Adam J Heller (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485274#comment-15485274
 ] 

Adam J Heller commented on FLINK-3370:
--

Hi Stephan. Could you explain how adding an aligned version of the window 
operator enables us to create sliding event time windows that do not replicate 
data into the different overlapping windows? That's a very exciting 
proposition, but I can't see how it would work. Thanks.

> Add an aligned version of the window operator
> -
>
> Key: FLINK-3370
> URL: https://issues.apache.org/jira/browse/FLINK-3370
> Project: Flink
>  Issue Type: Improvement
>  Components: Windowing Operators
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> The windowing operators currently follow a generic implementation for support 
> of unaligned windows.
> We can gain efficiency by creating a variant that is optimized for aligned 
> windows:
>   - Aligned windows can use aligned triggers, which keep no per-key state
>   - Less trigger state means less checkpointing data
>   - Based on the aligned windows, we can create sliding event time windows 
> that do not replicate data into the different overlapping windows



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2014) Add LASSO regression

2016-09-12 Thread Victor Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484612#comment-15484612
 ] 

Victor Chen edited comment on FLINK-2014 at 9/12/16 8:55 PM:
-

To do LASSO, I'm thinking of implementing Greedy Parallel Coordinate Descent 
(http://www.caam.rice.edu/~optimization/disparse/parallel_and_distributed_sparse_optimization.pdf).

Similar to PR#1102 (https://github.com/apache/flink/pull/1102), I'm interested 
in performing training with SSP. Would it be too risky to base my alg on SSP 
(since the PR hasn't been merged yet)? Are there any "primitives" in Flink I 
could use to do non-SSP training (e.g. sync/async training)? I'm planning to 
get a minimum viable "product" by the end of this week.

Also, I know [~tvas] is working on FLINK-2013 (Create GLM framework). Should I 
extend the GLM interface from your branch? SInce I plan to finish by the end of 
this week and I don't know how much time it'll take, I propose to code a 
LASSO-specific algorithm, then within 2 weeks or so, generalize to the GLM 
framework.


was (Author: vlchen91):
To do LASSO, I'm thinking of implementing Greedy Parallel Coordinate Descent 
(http://www.caam.rice.edu/~optimization/disparse/parallel_and_distributed_sparse_optimization.pdf).

Similar to PR#1102 (https://github.com/apache/flink/pull/1102), I'm interested 
in performing training with SSP. Would it be too risky to base my alg on SSP 
(since the PR hasn't been merged yet)? Are there any "primitives" in Flink I 
could use to do non-SSP training (e.g. sync/async training)? I'm planning to 
get a minimum viable "product" by the end of this week.

Also, I know Theodore is working on FLINK-2013 (Create GLM framework). Should I 
extend the GLM interface from your branch? SInce I plan to finish by the end of 
this week and I don't know how much time it'll take, I propose to code a 
LASSO-specific algorithm, then within 2 weeks or so, generalize to the GLM 
framework.

> Add LASSO regression
> 
>
> Key: FLINK-2014
> URL: https://issues.apache.org/jira/browse/FLINK-2014
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Theodore Vasiloudis
>Priority: Minor
>
> LASSO is a linear model that uses L1 regularization



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485230#comment-15485230
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2487
  
Hi @haoch,

I think it would be beneficial if you write a few words describing your 
design or add more JavaDocs. This would make the review process more 
straightforward.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2016-09-12 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2487
  
Hi @haoch,

I think it would be beneficial if you write a few words describing your 
design or add more JavaDocs. This would make the review process more 
straightforward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485210#comment-15485210
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78449235
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Execution Context
+ */
+public class SiddhiOperatorContext implements Serializable {
--- End diff --

I think all public methods would benefit from JavaDocs


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
>

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78449235
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.util.Preconditions;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SiddhiCEP Operator Execution Context
+ */
+public class SiddhiOperatorContext implements Serializable {
--- End diff --

I think all public methods would benefit from JavaDocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4612) Close FileWriter using try with resources

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485185#comment-15485185
 ] 

ASF GitHub Bot commented on FLINK-4612:
---

Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2492#discussion_r78447379
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java ---
@@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String 
dir, String contents) thro
f.createNewFile();
f.deleteOnExit();

-   BufferedWriter out = new BufferedWriter(new FileWriter(f));
-   try { 
+   try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) 
{
--- End diff --

Thank you. Just fixed that


> Close FileWriter using try with resources
> -
>
> Key: FLINK-4612
> URL: https://issues.apache.org/jira/browse/FLINK-4612
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> FileWriter is not closed properly in many places in the project modules



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485183#comment-15485183
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78447224
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public abstract class AbstractSiddhiOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+   private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+   private final SiddhiOperatorContext siddhiPlan;
+   private final String executionExpression;
+   private final boolean isProcessingTime;
+   private final Map> 
streamRecordSerializers;
+
+   private transient SiddhiManager siddhiManager;
+   private transient ExecutionPlanRuntime siddhiRuntime;
+   private transient Map inputStreamHandlers;
+
+   // queue to buffer out of order stream records
+   private transient PriorityQueue> priorityQueue;
+
+   /**
+* @param siddhiPlan Siddhi CEP  Execution Plan
+*/
+   public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+   validate(siddhiPlan);
+   this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+   this.siddhiPlan = siddhiPlan;
+   this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() 
== TimeCharacteristic.ProcessingTime;
+   this.streamRecordSerializers = new HashMap<>();
+
+   for (String streamId : this.siddhiPlan.getInputStreams()) {
+   streamRecordSerializers.put(streamId, 
createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), 
this.siddhiPlan.getExecutionConfig()));
+   }
+   }
+
+   protected abstract MultiplexingStreamRecordSerializer 
createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig 
executionConfig);
+
+   protected Multiplex

[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...

2016-09-12 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2492#discussion_r78447379
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java ---
@@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String 
dir, String contents) thro
f.createNewFile();
f.deleteOnExit();

-   BufferedWriter out = new BufferedWriter(new FileWriter(f));
-   try { 
+   try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) 
{
--- End diff --

Thank you. Just fixed that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78447224
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public abstract class AbstractSiddhiOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+   private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+   private final SiddhiOperatorContext siddhiPlan;
+   private final String executionExpression;
+   private final boolean isProcessingTime;
+   private final Map> 
streamRecordSerializers;
+
+   private transient SiddhiManager siddhiManager;
+   private transient ExecutionPlanRuntime siddhiRuntime;
+   private transient Map inputStreamHandlers;
+
+   // queue to buffer out of order stream records
+   private transient PriorityQueue> priorityQueue;
+
+   /**
+* @param siddhiPlan Siddhi CEP  Execution Plan
+*/
+   public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+   validate(siddhiPlan);
+   this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+   this.siddhiPlan = siddhiPlan;
+   this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() 
== TimeCharacteristic.ProcessingTime;
+   this.streamRecordSerializers = new HashMap<>();
+
+   for (String streamId : this.siddhiPlan.getInputStreams()) {
+   streamRecordSerializers.put(streamId, 
createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), 
this.siddhiPlan.getExecutionConfig()));
+   }
+   }
+
+   protected abstract MultiplexingStreamRecordSerializer 
createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig 
executionConfig);
+
+   protected MultiplexingStreamRecordSerializer 
getStreamRecordSerializer(String streamId) {
+   if (streamRecordSerializers.containsKey(streamId)) {
+   return streamRecordSerializers.get(streamId);
+   } else {
+   

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485180#comment-15485180
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78447005
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public abstract class AbstractSiddhiOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+   private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+   private final SiddhiOperatorContext siddhiPlan;
+   private final String executionExpression;
+   private final boolean isProcessingTime;
+   private final Map> 
streamRecordSerializers;
+
+   private transient SiddhiManager siddhiManager;
+   private transient ExecutionPlanRuntime siddhiRuntime;
+   private transient Map inputStreamHandlers;
+
+   // queue to buffer out of order stream records
+   private transient PriorityQueue> priorityQueue;
+
+   /**
+* @param siddhiPlan Siddhi CEP  Execution Plan
+*/
+   public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+   validate(siddhiPlan);
+   this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+   this.siddhiPlan = siddhiPlan;
+   this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() 
== TimeCharacteristic.ProcessingTime;
+   this.streamRecordSerializers = new HashMap<>();
+
+   for (String streamId : this.siddhiPlan.getInputStreams()) {
--- End diff --

This can be moved into a separate method.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78447005
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.StreamSchema;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public abstract class AbstractSiddhiOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSiddhiOperator.class);
+   private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+   private final SiddhiOperatorContext siddhiPlan;
+   private final String executionExpression;
+   private final boolean isProcessingTime;
+   private final Map> 
streamRecordSerializers;
+
+   private transient SiddhiManager siddhiManager;
+   private transient ExecutionPlanRuntime siddhiRuntime;
+   private transient Map inputStreamHandlers;
+
+   // queue to buffer out of order stream records
+   private transient PriorityQueue> priorityQueue;
+
+   /**
+* @param siddhiPlan Siddhi CEP  Execution Plan
+*/
+   public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+   validate(siddhiPlan);
+   this.executionExpression = siddhiPlan.getFinalExecutionPlan();
+   this.siddhiPlan = siddhiPlan;
+   this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() 
== TimeCharacteristic.ProcessingTime;
+   this.streamRecordSerializers = new HashMap<>();
+
+   for (String streamId : this.siddhiPlan.getInputStreams()) {
--- End diff --

This can be moved into a separate method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485176#comment-15485176
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446790
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP API Interface
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+   private final SiddhiCEP environment;
+
+   public SiddhiStream(SiddhiCEP environment) {
--- End diff --

Could we name this `cepEnvironment`?


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCas

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446790
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
+import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory;
+import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Siddhi CEP API Interface
+ */
+@PublicEvolving
+public abstract class SiddhiStream {
+   private final SiddhiCEP environment;
+
+   public SiddhiStream(SiddhiCEP environment) {
--- End diff --

Could we name this `cepEnvironment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485174#comment-15485174
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446628
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
--- End diff --

I think this class would benefit from more JavaDocs.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every 

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485172#comment-15485172
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446582
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
--- End diff --

The same comment is for other public methods of this class.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, 

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446628
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
--- End diff --

I think this class would benefit from more JavaDocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446460
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
--- End diff --

I would suggest to use `Preconditions` class to check the input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485169#comment-15485169
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446460
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
--- End diff --

I would suggest to use `Preconditions` class to check the input.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive T

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446582
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
--- End diff --

The same comment is for other public methods of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485167#comment-15485167
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446343
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
--- End diff --

Could you put constructor after the fields definitions?


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi C

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78446343
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map> dataStreams = new HashMap<>();
+   private final Map> dataStreamSchemas = 
new HashMap<>();
+   private final Map> extensions = new HashMap<>();
+
+   public Map> getDataStreams() {
+   return this.dataStreams;
+   }
+
+   public Map> getDataStreamSchemas() {
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId) {
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map> getExtensions() {
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if (!isStreamDefined(streamId)) {
+   throw new UndefinedStreamException("Stream (streamId: " 
+ streamId + ") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
--- End diff --

Could you put constructor after the fields definitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4612) Close FileWriter using try with resources

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484935#comment-15484935
 ] 

ASF GitHub Bot commented on FLINK-4612:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2492#discussion_r78430845
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java ---
@@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String 
dir, String contents) thro
f.createNewFile();
f.deleteOnExit();

-   BufferedWriter out = new BufferedWriter(new FileWriter(f));
-   try { 
+   try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) 
{
--- End diff --

there should be a space after try; same issue occurs in other places as 
well.


> Close FileWriter using try with resources
> -
>
> Key: FLINK-4612
> URL: https://issues.apache.org/jira/browse/FLINK-4612
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> FileWriter is not closed properly in many places in the project modules



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...

2016-09-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2492#discussion_r78430845
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java ---
@@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String 
dir, String contents) thro
f.createNewFile();
f.deleteOnExit();

-   BufferedWriter out = new BufferedWriter(new FileWriter(f));
-   try { 
+   try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) 
{
--- End diff --

there should be a space after try; same issue occurs in other places as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484801#comment-15484801
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78422242
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

> If so, getRowKey() API would be the same as rowkey(IN value) except a 
name change.

May be yes.  that is better? Ok. Let's see what others has to say. Can you 
ping @rmetzger for getting this in and for further reviews?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78422242
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

> If so, getRowKey() API would be the same as rowkey(IN value) except a 
name change.

May be yes.  that is better? Ok. Let's see what others has to say. Can you 
ping @rmetzger for getting this in and for further reviews?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484737#comment-15484737
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78418366
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , if we have 
MutationActions mutationActions  = HBaseMapper#actions(rowkey, value);
Then, the rowkey information would be included in mutationActions object 
being created. So when we call mutationActions. createMutations() there would 
be no need to pass rowkey as an argument, which I think is totally fine. But 
what I'm confused is how do we derive rowkey from the input value? Should we 
pass input value as an argument to getRowKey() API you mentioned? If so, 
getRowKey() API would be the same as rowkey(IN value) except a name change.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-12 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r78418366
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Hi @ramkrish86 , if we have 
MutationActions mutationActions  = HBaseMapper#actions(rowkey, value);
Then, the rowkey information would be included in mutationActions object 
being created. So when we call mutationActions. createMutations() there would 
be no need to pass rowkey as an argument, which I think is totally fine. But 
what I'm confused is how do we derive rowkey from the input value? Should we 
pass input value as an argument to getRowKey() API you mentioned? If so, 
getRowKey() API would be the same as rowkey(IN value) except a name change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2014) Add LASSO regression

2016-09-12 Thread Victor Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484612#comment-15484612
 ] 

Victor Chen commented on FLINK-2014:


To do LASSO, I'm thinking of implementing Greedy Parallel Coordinate Descent 
(http://www.caam.rice.edu/~optimization/disparse/parallel_and_distributed_sparse_optimization.pdf).

Similar to PR#1102 (https://github.com/apache/flink/pull/1102), I'm interested 
in performing training with SSP. Would it be too risky to base my alg on SSP 
(since the PR hasn't been merged yet)? Are there any "primitives" in Flink I 
could use to do non-SSP training (e.g. sync/async training)? I'm planning to 
get a minimum viable "product" by the end of this week.

Also, I know Theodore is working on FLINK-2013 (Create GLM framework). Should I 
extend the GLM interface from your branch? SInce I plan to finish by the end of 
this week and I don't know how much time it'll take, I propose to code a 
LASSO-specific algorithm, then within 2 weeks or so, generalize to the GLM 
framework.

> Add LASSO regression
> 
>
> Key: FLINK-2014
> URL: https://issues.apache.org/jira/browse/FLINK-2014
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Theodore Vasiloudis
>Priority: Minor
>
> LASSO is a linear model that uses L1 regularization



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-12 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484571#comment-15484571
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

To go forward with this, I think as [~ggevay] suggested, I could create two PRs 
one for the allocation of memory for the sorters and other for the iterators. 
So that we could see the amount of change and if needed make one PR combining 
both and take it in.
Any suggestions?

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock

2016-09-12 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4482:
--
Description: 
In CheckpointCoordinator#stopCheckpointScheduler() :
{code}
synchronized (lock) {
...
  numUnsuccessfulCheckpointsTriggers = 0;
{code}

triggerLock is not held in the above operation.
See comment for triggerLock earlier in triggerCheckpoint():
{code}
// we lock with a special lock to make sure that trigger requests do not 
overtake each other.
// this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
// may issue blocking operations. Using a different lock than teh 
coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages 
during that time.
synchronized (triggerLock) {
{code}

  was:
In CheckpointCoordinator#stopCheckpointScheduler() :
{code}
synchronized (lock) {
...
  numUnsuccessfulCheckpointsTriggers = 0;
{code}
triggerLock is not held in the above operation.
See comment for triggerLock earlier in triggerCheckpoint():
{code}
// we lock with a special lock to make sure that trigger requests do not 
overtake each other.
// this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
// may issue blocking operations. Using a different lock than teh 
coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages 
during that time.
synchronized (triggerLock) {
{code}


> numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
> --
>
> Key: FLINK-4482
> URL: https://issues.apache.org/jira/browse/FLINK-4482
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In CheckpointCoordinator#stopCheckpointScheduler() :
> {code}
> synchronized (lock) {
> ...
>   numUnsuccessfulCheckpointsTriggers = 0;
> {code}
> triggerLock is not held in the above operation.
> See comment for triggerLock earlier in triggerCheckpoint():
> {code}
> // we lock with a special lock to make sure that trigger requests do not 
> overtake each other.
> // this is not done with the coordinator-wide lock, because the 
> 'checkpointIdCounter'
> // may issue blocking operations. Using a different lock than teh 
> coordinator-wide lock,
> // we avoid blocking the processing of 'acknowledge/decline' messages 
> during that time.
> synchronized (triggerLock) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-12 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484104#comment-15484104
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

I did try changing the code to ensure that the Drivers can be made to hold on 
to the memory segments for iterative tasks. But that is indeed a tedious one. 
We need to change and touch lot of places. Changing the sorters and their 
memory allocation was much more contained and better to look at.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15483603#comment-15483603
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2275
  
Perhaps you are running into similar problems as described here: 
http://stackoverflow.com/questions/2890259/running-each-junit-test-in-a-separate-jvm-in-eclipse

#JustTryingToHelp


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-12 Thread nielsbasjes
Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2275
  
Perhaps you are running into similar problems as described here: 
http://stackoverflow.com/questions/2890259/running-each-junit-test-in-a-separate-jvm-in-eclipse

#JustTryingToHelp


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-12 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15483601#comment-15483601
 ] 

Till Rohrmann commented on FLINK-4603:
--

That makes sense. 

> KeyedStateBackend cannot restore user code classes
> --
>
> Key: FLINK-4603
> URL: https://issues.apache.org/jira/browse/FLINK-4603
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-09-12 Thread JIRA
Gábor Hermann created FLINK-4613:


 Summary: Extend ALS to handle implicit feedback datasets
 Key: FLINK-4613
 URL: https://issues.apache.org/jira/browse/FLINK-4613
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Gábor Hermann
Assignee: Gábor Hermann


The Alternating Least Squares implementation should be extended to handle 
_implicit feedback_ datasets. These datasets do not contain explicit ratings by 
users, they are rather built by collecting user behavior (e.g. user listened to 
artist X for Y minutes), and they require a slightly different optimization 
objective. See details by [Hu et al|http://dx.doi.org/10.1109/ICDM.2008.22].

We do not need to modify much in the original ALS algorithm. See [Spark ALS 
implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
 which could be a basis for this extension. Only the updating factor part is 
modified, and most of the changes are in the local parts of the algorithm (i.e. 
UDFs). In fact, the only modification that is not local, is precomputing a 
matrix product Y^T * Y and broadcasting it to all the nodes, which we can do 
with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-12 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15483350#comment-15483350
 ] 

Ivan Mushketyk commented on FLINK-2254:
---

Hi Vasia,

Thank you for reviewing the design doc.

I do agree with you and Greg, it seems cleaner to keep these classes separate. 
Regarding common methods, I was thinking about methods like map verticies or 
map edges.

If we agree that bipartite graph should be a separate classes I will add 
subtasks for this task (this task seems too big) like: 
Add BipartiteGraph class
Implement BipartiteGraph reader
Implement bipartite graph generators
Implement bipartite graph stats

For the bipartite graph I'll implement methods for accessing edges and 
verticies and projections.

What do you think about this?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)