[jira] [Commented] (FLINK-4609) Remove redundant check for null in CrossOperator

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479200#comment-15479200
 ] 

ASF GitHub Bot commented on FLINK-4609:
---

GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2490

[FLINK-4609] Remove redundant check for null in CrossOperator

https://issues.apache.org/jira/browse/FLINK-4609

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4609

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit 272007fc8f350b1b998f28762aade6760b588c73
Author: Alexander Pivovarov 
Date:   2016-09-10T05:24:43Z

[FLINK-4609] Remove redundant check for null in CrossOperator




> Remove redundant check for null in CrossOperator
> 
>
> Key: FLINK-4609
> URL: https://issues.apache.org/jira/browse/FLINK-4609
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> CrossOperator checks input1 and input2 for null after they were dereferenced



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2490

[FLINK-4609] Remove redundant check for null in CrossOperator

https://issues.apache.org/jira/browse/FLINK-4609

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4609

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit 272007fc8f350b1b998f28762aade6760b588c73
Author: Alexander Pivovarov 
Date:   2016-09-10T05:24:43Z

[FLINK-4609] Remove redundant check for null in CrossOperator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4609) Remove redundant check for null in CrossOperator

2016-09-09 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4609:
--

 Summary: Remove redundant check for null in CrossOperator
 Key: FLINK-4609
 URL: https://issues.apache.org/jira/browse/FLINK-4609
 Project: Flink
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.1.2
Reporter: Alexander Pivovarov
Priority: Trivial


CrossOperator checks input1 and input2 for null after they were dereferenced



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-09 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4608:
---
Description: Max/Min AggregationFunction use & instead of &&. Usually we 
use short-circuit logic in if operators in java  (was: Max/Min 
AggregationFunction use & instead of &&. Usually we use && in if blocks)

> Use short-circuit AND in Max/Min AggregationFunction
> 
>
> Key: FLINK-4608
> URL: https://issues.apache.org/jira/browse/FLINK-4608
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> Max/Min AggregationFunction use & instead of &&. Usually we use short-circuit 
> logic in if operators in java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-09 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4608:
---
Description: Max/Min AggregationFunction use & instead of &&. Usually we 
use && in if blocks  (was: Max/Min AggregationFunction use & instead of &&)

> Use short-circuit AND in Max/Min AggregationFunction
> 
>
> Key: FLINK-4608
> URL: https://issues.apache.org/jira/browse/FLINK-4608
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> Max/Min AggregationFunction use & instead of &&. Usually we use && in if 
> blocks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479159#comment-15479159
 ] 

ASF GitHub Bot commented on FLINK-4608:
---

GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2489

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction

https://issues.apache.org/jira/browse/FLINK-4608

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4608

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450
Author: Alexander Pivovarov 
Date:   2016-09-10T04:52:36Z

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction




> Use short-circuit AND in Max/Min AggregationFunction
> 
>
> Key: FLINK-4608
> URL: https://issues.apache.org/jira/browse/FLINK-4608
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> Max/Min AggregationFunction use & instead of &&



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggr...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2489

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction

https://issues.apache.org/jira/browse/FLINK-4608

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4608

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450
Author: Alexander Pivovarov 
Date:   2016-09-10T04:52:36Z

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-09 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4608:
---
Description: Max/Min AggregationFunction use & instead of &&

> Use short-circuit AND in Max/Min AggregationFunction
> 
>
> Key: FLINK-4608
> URL: https://issues.apache.org/jira/browse/FLINK-4608
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> Max/Min AggregationFunction use & instead of &&



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-09 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4608:
--

 Summary: Use short-circuit AND in Max/Min AggregationFunction
 Key: FLINK-4608
 URL: https://issues.apache.org/jira/browse/FLINK-4608
 Project: Flink
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.1.2
Reporter: Alexander Pivovarov
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270898
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+  extends StreamingMultipleProgramsTestBase {
+
+  val testFilePath = 
ExplainStreamTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilter() : Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env.fromElements((1, "hello"))
+  .toTable(tEnv, 'a, 'b)
+  .filter("a % 2 = 0")
+
+val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+val source = scala.io.Source.fromFile(testFilePath +
+  
"../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n",
 "\n")
+assertEquals(result, source)
+  }
+
+  @Test
+  def testUnion() : Unit = {
--- End diff --

no need for space after () on line 35 and 50


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270846
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -271,5 +271,21 @@ abstract class StreamTableEnvironment(
 }
 
   }
+  /*
+  * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
+* the result of the given [[Table]].
+  *
+  * @param table The table for which the AST and execution plan will be 
returned.
+* @param extended Flag to include detailed optimizer estimates.
+  */
+   def explain(table: Table): String = {
+
+val ast = RelOptUtil.toString(table.getRelNode)
+
+s"== Abstract Syntax Tree ==" +
+  System.lineSeparator +
+  s"$ast"
--- End diff --

No need for s on line 285


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270826
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -271,5 +271,21 @@ abstract class StreamTableEnvironment(
 }
 
   }
+  /*
+  * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
+* the result of the given [[Table]].
+  *
+  * @param table The table for which the AST and execution plan will be 
returned.
+* @param extended Flag to include detailed optimizer estimates.
+  */
+   def explain(table: Table): String = {
+
+val ast = RelOptUtil.toString(table.getRelNode)
+
+s"== Abstract Syntax Tree ==" +
+  System.lineSeparator +
+  s"$ast"
--- End diff --

Maybe this? 
```
s"== Abstract Syntax Tree ==${System.lineSeparator}$ast"
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270796
  
--- Diff: docs/dev/table_api.md ---
@@ -2457,3 +2457,27 @@ The Table API provides a configuration (the 
so-called `TableConfig`) to modify r
 By default, the Table API supports `null` values. Null handling can be 
disabled to improve preformance by setting the `nullCheck` property in the 
`TableConfig` to `false`.
 
 {% top %}
+
+Explaining a Table
+
+The Table API provides a mechanism to describe the graph of operations 
that leads to the resulting output. This is done through the 
`TableEnvironment#explain(table)` method. It returns a string describing two 
graphs: the Abstract Syntax Tree of the relational algebra query and the 
Flink's Execution Plan of the equivalent Flink's Job. 
+
+Table `explain` is supported for both `BatchTableEnvironment` and 
`StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support 
the explanation of the Execution Plan.
+
+
+
+{% highlight scala %}
+   val env = StreamExecutionEnvironment.getExecutionEnvironment
+   val tEnv = TableEnvironment.getTableEnvironment(env)
+
+   val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+   val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+   val table = table1.unionAll(table2)
+
+   val explanation:String = tEnv.explain(table)
--- End diff --

put space before String


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479106#comment-15479106
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270764
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
 

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479103#comment-15479103
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270757
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
 

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270764
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
+
+   public StreamExecutionEnvironment getExecutionEnvironment() {
+   return executionEnvironment;
+   }
+
+   public void registerExtension(String extensionName, Class 
extensionClass) {
+   

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479102#comment-15479102
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270750
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
--- End diff --

lines 64 and 65 can be removed.
Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 
38


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi 

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270757
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
+
+   public StreamExecutionEnvironment getExecutionEnvironment() {
+   return executionEnvironment;
+   }
+
+   public void registerExtension(String extensionName, Class 
extensionClass) {
+   

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270750
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map dataStreams;
+   private final Map dataStreamSchemas;
+   private final Map extensions = new HashMap<>();
+
+   public Map getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
--- End diff --

lines 64 and 65 can be removed.
Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 
38


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4607) Close FileInputStream in ParameterTool and other

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479062#comment-15479062
 ] 

ASF GitHub Bot commented on FLINK-4607:
---

GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2488

[FLINK-4607] Close FileInputStream in ParameterTool and other

https://issues.apache.org/jira/browse/FLINK-4607

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2488


commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4
Author: Alexander Pivovarov 
Date:   2016-09-10T03:32:28Z

[FLINK-4607] Close FileInputStream in ParameterTool and other




> Close FileInputStream in ParameterTool and other
> 
>
> Key: FLINK-4607
> URL: https://issues.apache.org/jira/browse/FLINK-4607
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and some tests do not close FileInputStream
> {code}
> flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
> flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
> flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
> flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
> flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2488: [FLINK-4607] Close FileInputStream in ParameterToo...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2488

[FLINK-4607] Close FileInputStream in ParameterTool and other

https://issues.apache.org/jira/browse/FLINK-4607

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2488


commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4
Author: Alexander Pivovarov 
Date:   2016-09-10T03:32:28Z

[FLINK-4607] Close FileInputStream in ParameterTool and other




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4607) Close FileInputStream in ParameterTool and other

2016-09-09 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4607:
--

 Summary: Close FileInputStream in ParameterTool and other
 Key: FLINK-4607
 URL: https://issues.apache.org/jira/browse/FLINK-4607
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.2
Reporter: Alexander Pivovarov
Priority: Trivial


ParameterTool and some tests do not close FileInputStream
{code}
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager

2016-09-09 Thread zhangjing (JIRA)
zhangjing created FLINK-4606:


 Summary: Integrate the new ResourceManager with the existing 
FlinkResourceManager
 Key: FLINK-4606
 URL: https://issues.apache.org/jira/browse/FLINK-4606
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: zhangjing
Assignee: zhangjing


Integrate the new ResourceManager with the existing FlinkResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread Hao Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478991#comment-15478991
 ] 

Hao Chen commented on FLINK-4520:
-

Hi Flink community,

The PR is available now, could anybogy help assign the ticket to me and review 
the PR? Any response is highly appreciated.

Thanks,
Hao

> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread Hao Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated FLINK-4520:

   Labels: cep library patch-available  (was: cep library)
Fix Version/s: 1.2.0

> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library, patch-available
> Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478961#comment-15478961
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2486
  
Closed this PR for cleaning commit history and recreated another PR #2487


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2486: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2016-09-09 Thread haoch
Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2486
  
Closed this PR for cleaning commit history and recreated another PR #2487


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478958#comment-15478958
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

GitHub user haoch opened a pull request:

https://github.com/apache/flink/pull/2487

[FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP 
Library

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

# Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

__It would be very helpful for flink users (especially streaming 
application developer) to provide a library to run Siddhi CEP query directly in 
Flink streaming application.__

# Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with 
Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP 
Execution Plan
  * Return output stream as DataStream with type intelligently inferred 
from Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)

# Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)

# Example

 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", 
"price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", 
"price","timestamp");

 DataStream> output = cep
.from("inputStream1").union("inputStream2")
.sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as 
id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
)
.returns("outputStream");

 env.execute();


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/flink FLINK-4520

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2487


commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen 
Date:   2016-08-29T12:34:42Z

Implement initial version of flink-siddhi stream operator

commit 

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread haoch
GitHub user haoch opened a pull request:

https://github.com/apache/flink/pull/2487

[FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP 
Library

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

# Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

__It would be very helpful for flink users (especially streaming 
application developer) to provide a library to run Siddhi CEP query directly in 
Flink streaming application.__

# Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with 
Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP 
Execution Plan
  * Return output stream as DataStream with type intelligently inferred 
from Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)

# Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)

# Example

 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", 
"price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", 
"price","timestamp");

 DataStream> output = cep
.from("inputStream1").union("inputStream2")
.sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as 
id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
)
.returns("outputStream");

 env.execute();


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/flink FLINK-4520

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2487


commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen 
Date:   2016-08-29T12:34:42Z

Implement initial version of flink-siddhi stream operator

commit d8b131d9204e9e359afae80087afe2aecab27eaf
Author: Chen, Hao 
Date:   2016-08-30T14:33:59Z

Implement SiddhiStream API and DSL

commit 56c150fadc54d06c186223e43a6fd9ac74eee837
Author: Chen, Hao 
Date:   2016-08-30T18:31:18Z

Reformat 

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478928#comment-15478928
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user haoch closed the pull request at:

https://github.com/apache/flink/pull/2486


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2486: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread haoch
Github user haoch closed the pull request at:

https://github.com/apache/flink/pull/2486


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2465: [FLINK-4447] [docs] Include NettyConfig options on Config...

2016-09-09 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2465
  
Hi @greghogan thanks for the PR. Took a look at it and seems good to go. 
The test failure doesn't seem related. Since I'm new to the community, I would 
probably ask someone else to have a look as well. @uce , if you can have a pass 
over this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4447) Include NettyConfig options on Configurations page

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478759#comment-15478759
 ] 

ASF GitHub Bot commented on FLINK-4447:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2465
  
Hi @greghogan thanks for the PR. Took a look at it and seems good to go. 
The test failure doesn't seem related. Since I'm new to the community, I would 
probably ask someone else to have a look as well. @uce , if you can have a pass 
over this?


> Include NettyConfig options on Configurations page
> --
>
> Key: FLINK-4447
> URL: https://issues.apache.org/jira/browse/FLINK-4447
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {{NettyConfig}} looks for the following configuration options which are not 
> listed in the Flink documentation.
> {noformat}
>   public static final String NUM_ARENAS = "taskmanager.net.num-arenas";
>   public static final String NUM_THREADS_SERVER = 
> "taskmanager.net.server.numThreads";
>   public static final String NUM_THREADS_CLIENT = 
> "taskmanager.net.client.numThreads";
>   public static final String CONNECT_BACKLOG = 
> "taskmanager.net.server.backlog";
>   public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = 
> "taskmanager.net.client.connectTimeoutSec";
>   public static final String SEND_RECEIVE_BUFFER_SIZE = 
> "taskmanager.net.sendReceiveBufferSize";
>   public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2442: [FLINK-4148] incorrect calculation minDist distance in Qu...

2016-09-09 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2442
  
Hi @xhumanoid , thanks for the PR. Could you please check the Failure 
messages and fix the build? Can help review once the PR is cleanly mergable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4148) incorrect calculation distance in QuadTree

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478750#comment-15478750
 ] 

ASF GitHub Bot commented on FLINK-4148:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2442
  
Hi @xhumanoid , thanks for the PR. Could you please check the Failure 
messages and fix the build? Can help review once the PR is cleanly mergable.


> incorrect calculation distance in QuadTree
> --
>
> Key: FLINK-4148
> URL: https://issues.apache.org/jira/browse/FLINK-4148
> Project: Flink
>  Issue Type: Bug
>Reporter: Alexey Diomin
>Priority: Trivial
> Attachments: 
> 0001-FLINK-4148-incorrect-calculation-minDist-distance-in.patch
>
>
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala#L105
> Because EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric we 
> always move in first case and never reach case for math.sqrt(minDist)
> correct match first EuclideanDistanceMetric and after it 
> SquaredEuclideanDistanceMetric
> p.s. because EuclideanDistanceMetric more compute expensive and stay as 
> default DistanceMetric it's can cause some performance degradation for KNN on 
> default parameters



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3457) Link to Apache Flink meetups from the 'Community' section of the website

2016-09-09 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478687#comment-15478687
 ] 

Neelesh Srinivas Salian commented on FLINK-3457:


Hi [~xazax], are you working on this?
If not, shall I grab it an post a PR?

> Link to Apache Flink meetups from the 'Community' section of the website
> 
>
> Key: FLINK-3457
> URL: https://issues.apache.org/jira/browse/FLINK-3457
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Slim Baltagi
>Assignee: Gabor Horvath
>Priority: Trivial
>
> Now with the number of Apache Flink meetups increasing worldwide, it is 
> helpful to add a link to Apache Flink meetups 
> http://www.meetup.com/topics/apache-flink/ to the community section of 
> https://flink.apache.org/community.html so visitors can conveniently find 
> them  right from the Apache Flink website. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2859) Add links to docs and JIRA issues in FlinkML roadmap

2016-09-09 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian closed FLINK-2859.
--
Resolution: Fixed

This is fixed since the page has the link to the FLINK-ML JIRAs list.

> Add links to docs and JIRA issues in FlinkML roadmap
> 
>
> Key: FLINK-2859
> URL: https://issues.apache.org/jira/browse/FLINK-2859
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Priority: Trivial
>  Labels: ML
>
> The FlinkML [vision and roadmap 
> doc|https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap]
>  lists a number of features we aim to have in FlinkML.
> It would be helpful to newcomers if each feature linked to its corresponding 
> JIRA issue, and already implemented features to their page in the docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME

2016-09-09 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian closed FLINK-1914.
--
Resolution: Fixed

> Wrong FS while starting YARN session without correct HADOOP_HOME
> 
>
> Key: FLINK-1914
> URL: https://issues.apache.org/jira/browse/FLINK-1914
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Zoltán Zvara
>Priority: Trivial
>  Labels: yarn, yarn-client
>
> When YARN session invoked ({{yarn-session.sh}}) without a correct 
> {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but 
> the deployed AM fails with an {{IllegalArgumentException}}:
> {code}
> java.lang.IllegalArgumentException: Wrong FS: 
> file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000
>   at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
>   at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371)
>   at scala.util.Try$.apply(Try.scala:161)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
> {code}
> IMO this {{IllegalArgumentException}} should get handled in 
> {{org.apache.flink.yarn.Utils.registerLocalResource}} or on an upper level to 
> provide a better error message. This needs to be looked up from YARN logs at 
> the moment, which is painful to a trivial mistake like missing 
> {{HADOOP_HOME}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME

2016-09-09 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478539#comment-15478539
 ] 

Neelesh Srinivas Salian commented on FLINK-1914:


Sounds good [~rmetzger]. Closing the JIRA.

> Wrong FS while starting YARN session without correct HADOOP_HOME
> 
>
> Key: FLINK-1914
> URL: https://issues.apache.org/jira/browse/FLINK-1914
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Zoltán Zvara
>Priority: Trivial
>  Labels: yarn, yarn-client
>
> When YARN session invoked ({{yarn-session.sh}}) without a correct 
> {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but 
> the deployed AM fails with an {{IllegalArgumentException}}:
> {code}
> java.lang.IllegalArgumentException: Wrong FS: 
> file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000
>   at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
>   at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371)
>   at scala.util.Try$.apply(Try.scala:161)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371)
>   at 
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
> {code}
> IMO this {{IllegalArgumentException}} should get handled in 
> {{org.apache.flink.yarn.Utils.registerLocalResource}} or on an upper level to 
> provide a better error message. This needs to be looked up from YARN logs at 
> the moment, which is painful to a trivial mistake like missing 
> {{HADOOP_HOME}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-710) Aggregate transformation works only with FieldPositionKeys

2016-09-09 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian closed FLINK-710.
-
Resolution: Duplicate

> Aggregate transformation works only with FieldPositionKeys
> --
>
> Key: FLINK-710
> URL: https://issues.apache.org/jira/browse/FLINK-710
> Project: Flink
>  Issue Type: Improvement
>Reporter: GitHub Import
>Assignee: Márton Balassi
>Priority: Trivial
>  Labels: github-import
>
> In the new Java API, Aggregate transformations can only be applied to 
> DataSets that are grouped using field positions and to ungrouped DataSets.
> DataSets grouped with KeySelector functions are not supported.
> Since Aggregations only work on Tuple DataSets which can be grouped using the 
> more convenient field positions, this might be OK.
> Or should we support KeySelector groupings as well for completeness? 
> Opinions?
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/710
> Created by: [fhueske|https://github.com/fhueske]
> Labels: enhancement, java api, question, user satisfaction, 
> Milestone: Release 0.6 (unplanned)
> Created at: Tue Apr 22 14:42:52 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2144) Incremental count, average, and variance for windows

2016-09-09 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477990#comment-15477990
 ] 

Daniel Blazevski edited comment on FLINK-2144 at 9/9/16 8:49 PM:
-

Hi,

I am curious to know the status of this issue.  

I have worked on Flink-ML a bit, and recently contributed to Flink's 
documentation of its Window API.  

If possible, I would like to work on this.  I looked into the code and saw how 
"sum" is implemented, and I assume average and variance should be implemented 
in similar ways (with 2 versions, one taking a string another an integer).  
Count, of course, is more simple in that it counts the number of data points 
(not two different inputs)

[~aljoscha][~ggevay]



was (Author: danielblazevski):
Hi,

I am curious to know the status of this issue.  

I have worked on Flink-ML a bit, and recently contributed to Flink's 
documentation of its Window API.  

If possible, I would like to work on this.  I looked into the code and saw how 
"sum" is implemented, and I assume average and variance should be implemented 
in similar ways (with 2 versions, one taking a string another an integer).  
Count, of course, is more simple in that it counts the number of data points 
(not two different inputs)



> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows

2016-09-09 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477990#comment-15477990
 ] 

Daniel Blazevski commented on FLINK-2144:
-

Hi,

I am curious to know the status of this issue.  

I have worked on Flink-ML a bit, and recently contributed to Flink's 
documentation of its Window API.  

If possible, I would like to work on this.  I looked into the code and saw how 
"sum" is implemented, and I assume average and variance should be implemented 
in similar ways (with 2 versions, one taking a string another an integer).  
Count, of course, is more simple in that it counts the number of data points 
(not two different inputs)



> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread Hao Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated FLINK-4520:

Description: 
h1. flink-siddhi proposal

h2. Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

It would be very helpful for flink users (especially streaming application 
developer) to provide a library to run Siddhi CEP query directly in Flink 
streaming application.

* http://wso2.com/products/complex-event-processor/
* https://github.com/wso2/siddhi

h2. Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like

* Filter
* Join
* Aggregation
* Group by
* Having
* Window
* Conditions and Expressions
* Pattern processing
* Sequence processing
* Event Tables
...

* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
* Register Flink DataStream associating native type information with Siddhi 
Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
* Connect with single or multiple Flink DataStreams with Siddhi CEP 
Execution Plan
* Return output stream as DataStream with type intelligently inferred from 
Siddhi Stream Schema

* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)

* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)


h2. Test Cases 

* org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java

h2. Example

{code}
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");

 DataStream> output = cep
  .from("inputStream1").union("inputStream2")
  .sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
  )
  .returns("outputStream");

 env.execute();
{code}

  was:
h1. flink-siddhi proposal

h2. Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

It would be very helpful for flink users (especially streaming application 
developer) to provide a library to run Siddhi CEP query directly in Flink 
streaming application.

* http://wso2.com/products/complex-event-processor/
* https://github.com/wso2/siddhi

h2. Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with Siddhi 
Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution 
Plan
  * Return output stream as DataStream with type intelligently inferred from 
Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)


h2. Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)


h2. Example

{code}
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
 cep.registerStream("inputStream2", input2, "id", 

[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread Hao Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated FLINK-4520:

Description: 
h1. flink-siddhi proposal

h2. Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

It would be very helpful for flink users (especially streaming application 
developer) to provide a library to run Siddhi CEP query directly in Flink 
streaming application.

* http://wso2.com/products/complex-event-processor/
* https://github.com/wso2/siddhi

h2. Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with Siddhi 
Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution 
Plan
  * Return output stream as DataStream with type intelligently inferred from 
Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)


h2. Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)


h2. Example

{code}
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");

 DataStream> output = cep
  .from("inputStream1").union("inputStream2")
  .sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
  )
  .returns("outputStream");

 env.execute();
{code}

  was:
h1. Flink Siddhi CEP Integration Proposal

h2. About Siddhi CEP
Siddhi CEP is a lightweight, easy-to-use Open Source Complex Event Processing 
Engine (CEP) released as a Java Library under Apache Software License v2.0. 
Siddhi CEP processes events which are generated by various event sources, 
analyses them and notifies appropriate complex events according to the user 
specified queries. 

* http://wso2.com/products/complex-event-processor/
* https://github.com/wso2/siddhi

h2. Proposal
As known, siddhi is very lightweight and rich featured CEP library,  supporting 
most traditional CEP cases like:
* Filter
* Join
* Aggregation
* Group by
* Having
* Window
* Conditions and Expressions
* Pattern processing
* Sequence processing
* Event Tables
* Partitions
* Scripting:Support JavaScript & Scala Scripts within Siddhi Queries
* Query: SQL like query language

The proposal is to 
* Embed siddhi CEP as an stream operator of Flink 
* Support native siddhi query, extensions to run inside Flink 
StreamExecutionEnvironment
* Integrate state management.
* Provide consistent DSL to integrate with Flink programing API.



> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>  Labels: cep, library
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi

[GitHub] flink pull request #2486: FLINK-4520 Integrate Siddhi as a light-weight Stre...

2016-09-09 Thread haoch
GitHub user haoch opened a pull request:

https://github.com/apache/flink/pull/2486

FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

# Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

__It would be very helpful for flink users (especially streaming 
application developer) to provide a library to run Siddhi CEP query directly in 
Flink streaming application.__

# Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with 
Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP 
Execution Plan
  * Return output stream as DataStream with type intelligently inferred 
from Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)

# Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)

# Example

 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", 
"price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", 
"price","timestamp");

 DataStream> output = cep
.from("inputStream1").union("inputStream2")
.sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as 
id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
)
.returns("outputStream");

 env.execute();



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/flink FLINK-4520

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen 
Date:   2016-08-29T12:34:42Z

Implement initial version of flink-siddhi stream operator

commit d8b131d9204e9e359afae80087afe2aecab27eaf
Author: Chen, Hao 
Date:   2016-08-30T14:33:59Z

Implement SiddhiStream API and DSL

commit 56c150fadc54d06c186223e43a6fd9ac74eee837
Author: Chen, Hao 
Date:   2016-08-30T18:31:18Z

Reformat code 

[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477941#comment-15477941
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

GitHub user haoch opened a pull request:

https://github.com/apache/flink/pull/2486

FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

# Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
Processing Engine (CEP) released as a Java Library under `Apache Software 
License v2.0`. Siddhi CEP processes events which are generated by various event 
sources, analyses them and notifies appropriate complex events according to the 
user specified queries. 

__It would be very helpful for flink users (especially streaming 
application developer) to provide a library to run Siddhi CEP query directly in 
Flink streaming application.__

# Features

* Integrate Siddhi CEP as an stream operator (i.e. 
`TupleStreamSiddhiOperator`), supporting rich CEP features like
  * Filter
  * Join
  * Aggregation
  * Group by
  * Having
  * Window
  * Conditions and Expressions
  * Pattern processing
  * Sequence processing
  * Event Tables
  ...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
`SiddhiCEP` and `SiddhiStream`)
  * Register Flink DataStream associating native type information with 
Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
  * Connect with single or multiple Flink DataStreams with Siddhi CEP 
Execution Plan
  * Return output stream as DataStream with type intelligently inferred 
from Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See 
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See 
`SiddhiCEP#registerExtension`)

# Test Cases 

* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase 
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)

# Example

 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", 
"price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", 
"price","timestamp");

 DataStream> output = cep
.from("inputStream1").union("inputStream2")
.sql(
"from every s1 = inputStream1[id == 2] "
 + " -> s2 = inputStream2[id == 3] "
 + "select s1.id as id_1, s1.name as name_1, s2.id as 
id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
 + "insert into outputStream"
)
.returns("outputStream");

 env.execute();



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/flink FLINK-4520

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen 
Date:   2016-08-29T12:34:42Z

Implement initial version of flink-siddhi stream operator

commit d8b131d9204e9e359afae80087afe2aecab27eaf

[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477764#comment-15477764
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Thanks @greghogan for your review. Please have a look. thanks!


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread rekhajoshm
Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Thanks @greghogan for your review. Please have a look. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477588#comment-15477588
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
Thanks, will try that.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-09 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
Thanks, will try that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477573#comment-15477573
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
I would suggest to just you your own Travis account. It is free, you just 
need to connect it with your account. Then push to any branch in your GitHub 
account and Travis will run.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-09 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
I would suggest to just you your own Travis account. It is free, you just 
need to connect it with your account. Then push to any branch in your GitHub 
account and Travis will run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477565#comment-15477565
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2463
  
I've rebased the pull request and incorporated your suggestions.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

2016-09-09 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2463
  
I've rebased the pull request and incorporated your suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477560#comment-15477560
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78212144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

Thank you for your comments @beyond1920. Your observations are correct. 
I've skipped this part of the implementation and wanted to address it next.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78212144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

Thank you for your comments @beyond1920. Your observations are correct. 
I've skipped this part of the implementation and wanted to address it next.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477387#comment-15477387
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78203358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -75,8 +76,30 @@
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-   
+
+   private Charset charset = Charset.forName("US-ASCII");
+
private ParseErrorState errorState = ParseErrorState.NONE;
+
+   /**
+* Parses the value of a field from the byte array.
+* The start position within the byte array and the array's valid 
length is given.
+* The content of the value is delimited by a field delimiter.
+*
+* @param bytes The byte array that holds the value.
+* @param startPos The index where the field starts
+* @param limit The limit unto which the byte contents is valid for the 
parser. The limit is the
+*  position one after the last valid byte.
+* @param delim The field delimiter character
+* @param reuse An optional reusable field to hold the value
+* @param charset The charset to parse with
+*
+* @return The index of the next delimiter, if the field was parsed 
correctly. A value less than 0 otherwise.
+*/
+   public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delim, T reuse, Charset charset){
+   this.charset = charset;
--- End diff --

Is this method needed? `GenericCsvInputFormat.open` can `setCharset` on 
each newly instantiated `FieldParser`, and in the case where a user decided to 
change charset on an open file `GenericCsvInputFormat.setCharset` could go 
through and `setCharset` on the list of `FieldParser`s.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78203358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -75,8 +76,30 @@
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-   
+
+   private Charset charset = Charset.forName("US-ASCII");
+
private ParseErrorState errorState = ParseErrorState.NONE;
+
+   /**
+* Parses the value of a field from the byte array.
+* The start position within the byte array and the array's valid 
length is given.
+* The content of the value is delimited by a field delimiter.
+*
+* @param bytes The byte array that holds the value.
+* @param startPos The index where the field starts
+* @param limit The limit unto which the byte contents is valid for the 
parser. The limit is the
+*  position one after the last valid byte.
+* @param delim The field delimiter character
+* @param reuse An optional reusable field to hold the value
+* @param charset The charset to parse with
+*
+* @return The index of the next delimiter, if the field was parsed 
correctly. A value less than 0 otherwise.
+*/
+   public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delim, T reuse, Charset charset){
+   this.charset = charset;
--- End diff --

Is this method needed? `GenericCsvInputFormat.open` can `setCharset` on 
each newly instantiated `FieldParser`, and in the case where a user decided to 
change charset on an open file `GenericCsvInputFormat.setCharset` could go 
through and `setCharset` on the list of `FieldParser`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-09 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
hmmm.. it's getting complicated. will try to debug the issue. Do you know 
why on Travis it has to fail but not on Jenkins?

How do I simulate this on Travis for my own testing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477337#comment-15477337
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199868
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -75,8 +76,30 @@
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-   
+
+   private Charset charset = Charset.forName("US-ASCII");
--- End diff --

Should this also default to `UFT-8`?


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199868
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -75,8 +76,30 @@
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-   
+
+   private Charset charset = Charset.forName("US-ASCII");
--- End diff --

Should this also default to `UFT-8`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477333#comment-15477333
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199594
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
+*/
+   protected void setCharset(Charset charset){
--- End diff --

Add space between `)` and `{` (and I count two more instances).


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199594
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
+*/
+   protected void setCharset(Charset charset){
--- End diff --

Add space between `)` and `{` (and I count two more instances).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477321#comment-15477321
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199216
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
--- End diff --

Should we spell out `charset` as "character set" in the documentation? 
Also, space after period and missing period at end.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477325#comment-15477325
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199365
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
+*/
+   protected void setCharset(Charset charset){
+   this.charset = charset != null ? charset : 
Charset.forName("UTF-8");
--- End diff --

Use `Preconditions.checkNotNull`?


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199365
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
+*/
+   protected void setCharset(Charset charset){
+   this.charset = charset != null ? charset : 
Charset.forName("UTF-8");
--- End diff --

Use `Preconditions.checkNotNull`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199262
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
--- End diff --

Double space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477323#comment-15477323
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199262
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
+*
+* @return The charset for the parser.
+*/
+   public Charset getCharset() {
+   return this.charset;
+   }
+
+   /**
+* Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+* when doing a parse.
+*
+* @param charset The charset  to set.
--- End diff --

Double space.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding

2016-09-09 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2060#discussion_r78199216
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] 
includedMask, Class[] fieldTypes) {
this.fieldIncluded = includedMask;
}
 
+   /**
+* Gets the Charset for the parser.Default is set to UTF-8
--- End diff --

Should we spell out `charset` as "character set" in the documentation? 
Also, space after period and missing period at end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477315#comment-15477315
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
hmmm.. it's getting complicated. will try to debug the issue. Do you know 
why on Travis it has to fail but not on Jenkins?

How do I simulate this on Travis for my own testing?


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...

2016-09-09 Thread chobeat
Github user chobeat commented on the issue:

https://github.com/apache/flink/pull/2485
  
Done, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...

2016-09-09 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2485
  
Could you rebase your PR on the current master and then do a force push? 
There are a lot of commits that should not appear here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4605) Add an expression that returns the return type of an expression

2016-09-09 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4605:
---

 Summary: Add an expression that returns the return type of an 
expression
 Key: FLINK-4605
 URL: https://issues.apache.org/jira/browse/FLINK-4605
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


Esp. for Java users of the Table API it is hard to obtain the return type of an 
expression. I propose to implement an expression that returns the type of an 
input expression as a string.

{{myLong.getType()}} could call the toString method of TypeInformation.

This could also be useful to distinguish between different subtypes of POJOs 
etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477252#comment-15477252
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78194763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

These are valid points, I will change the code to use the 
`LeaderRetrievalListener` instead.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78194763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

These are valid points, I will change the code to use the 
`LeaderRetrievalListener` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4591) Select star does not work with grouping

2016-09-09 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477240#comment-15477240
 ] 

Timo Walther commented on FLINK-4591:
-

Ok, Calcite does also not support {{SELECT * FROM WordCount GROUP BY word}}. 
I'm fine with closing this issue.

> Select star does not work with grouping
> ---
>
> Key: FLINK-4591
> URL: https://issues.apache.org/jira/browse/FLINK-4591
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> It would be consistent if this would also work:
> {{table.groupBy( '* ).select( "* )}}
> Currently, the star only works in a plain select without grouping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477093#comment-15477093
 ] 

Aljoscha Krettek commented on FLINK-4603:
-

We wouldn't if we always keep everything in serialized form, as is the case for 
the RocksDB backend. In that case, the user code is only necessary when a state 
access comes in and when that happens we're already "in" the user code class 
loader.

> KeyedStateBackend cannot restore user code classes
> --
>
> Key: FLINK-4603
> URL: https://issues.apache.org/jira/browse/FLINK-4603
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477052#comment-15477052
 ] 

ASF GitHub Bot commented on FLINK-4561:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2459
  
I would really like to not change the way it is done right now.
It works and it is the result of many user interactions, testing across 
Maven, SBT, Gradle, etc.

These various build systems have such subtle differences that I think we 
should stick with paradigm of "don't change it unless it is broken".


> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...

2016-09-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2459
  
I would really like to not change the way it is done right now.
It works and it is the result of many user interactions, testing across 
Maven, SBT, Gradle, etc.

These various build systems have such subtle differences that I think we 
should stick with paradigm of "don't change it unless it is broken".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477004#comment-15477004
 ] 

Fabian Hueske commented on FLINK-4599:
--

Definitely. Would be great if you could add this to the docs. Thanks!

> Add 'explain()' also to StreamTableEnvironment
> --
>
> Key: FLINK-4599
> URL: https://issues.apache.org/jira/browse/FLINK-4599
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>  Labels: starter
>
> Currenlty, only the BatchTableEnvironment supports the {{explain}} command 
> for tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-09 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477000#comment-15477000
 ] 

Simone Robutti commented on FLINK-4599:
---

I'd rather go for the second choice. I've opened the PR. I've noticed that this 
call is not documented. Do you think it would be useful to add a few lines both 
for the batch and streaming documentation?

> Add 'explain()' also to StreamTableEnvironment
> --
>
> Key: FLINK-4599
> URL: https://issues.apache.org/jira/browse/FLINK-4599
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>  Labels: starter
>
> Currenlty, only the BatchTableEnvironment supports the {{explain}} command 
> for tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread chobeat
GitHub user chobeat opened a pull request:

https://github.com/apache/flink/pull/2485

[Flink 4599] - Add 'explain()' also to StreamTableEnvironment



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radicalbit/flink FLINK-4599

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2485


commit 29f0204308a8f871e7b27767efc3020e362218f4
Author: chobeat 
Date:   2016-09-09T09:27:41Z

AST DataStream table explain

commit 593569405bbd2b728f5e9edc82242019a7fbe9be
Author: chobeat 
Date:   2016-09-09T09:31:12Z

Merge remote-tracking branch 'origin/master' into FLINK-4599

commit 7bf135237cb24ed889b92bf5d8026de43d1038fc
Author: f7753 
Date:   2016-08-22T13:07:55Z

[FLINK-4436] Unclosed DataOutputBuffer in Utils#setTokensFor()

This closes #2402

commit ba043aaa51401f53c2868927a540ebf7a3493318
Author: Greg Hogan 
Date:   2016-07-25T13:09:27Z

[FLINK-4257] [gelly] Handle delegating algorithm change of class

Replaces Delegate with NoOpOperator.

This closes #2474

commit 850fd5fec5133f7729bc6a5b2af00cb2decc229b
Author: Till Rohrmann 
Date:   2016-08-31T15:58:09Z

[FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

Rename _configuration to originalConfiguration

Remove testing classes from main scope in flink-runtime

Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils 
required
these files to be in the main scope of flink-runtime. With the removal of 
the
ForkableFlinkMiniCluster, these classes are now no longer needed and can be 
moved
back to the test scope.

This closes #2450.

commit 7eecc4bd7aa605051cc3c6abc1a87233b8695127
Author: Till Rohrmann 
Date:   2016-09-01T12:41:44Z

[FLINK-4456] Replace Akka specific types by interfaces in Task

Introduce TaskExecutionStateListener for Task

Replace JobManagerGateway in Task by InputSplitProvider and 
CheckpointNotifier

Replace the TaskManager ActorGateway by TaskManagerConnection in Task

Rename taskmanager.CheckpointNotifier into CheckpointResponder; rename 
TaskExecutionStateListener.notifyTaskExecutionState into 
notifyTaskExecutionStateChanged

Remove InputSplitProvider.start; add ClassLoader parameter to 
InputSplitProvider.getNextInputSplit

Removes the unused class InputSplitIterator.

Update InputSplitProvider JavaDocs

This closes #2456.

commit 45f842eb8ea7da4f0535cd29c345fd45fe3d3815
Author: Greg Hogan 
Date:   2016-09-02T14:42:30Z

[FLINK-4522] [docs] Gelly link broken in homepage

The Gelly documentation was recently split into multiple pages in
FLINK-4104 but was missing a redirect. This commit updates the Gelly
redirect to point to the old page.

This closes #2464

commit 7c9d1679c8319d560c9032691ad05b723b852f66
Author: Greg Hogan 
Date:   2016-09-02T15:53:08Z

[FLINK-4571] [gelly] Configurable little parallelism in Gelly drivers

This closes #2475

commit 2ddb4fef89debd7cef740058cc79ddca097879e1
Author: Alexander Pivovarov 
Date:   2016-09-07T21:11:06Z

[FLINK-4595] Close FileOutputStream in ParameterTool

This closes #2478

commit 2d4c75e688cd143022fc1db2b209c71935003c7d
Author: chobeat 
Date:   2016-09-09T09:32:13Z

Merge branch 'FLINK-4599' of github.com:radicalbit/flink into FLINK-4599

commit f3ba22ba5d2901c25cdbd11689a1ca3cc50935cd
Author: chobeat 
Date:   2016-09-09T12:30:50Z

moved stream explain test resources to main folder




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-09 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476941#comment-15476941
 ] 

Till Rohrmann commented on FLINK-4603:
--

Wouldn't we even then need the user code class loader to load user code classes?

> KeyedStateBackend cannot restore user code classes
> --
>
> Key: FLINK-4603
> URL: https://issues.apache.org/jira/browse/FLINK-4603
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476881#comment-15476881
 ] 

Aljoscha Krettek commented on FLINK-4603:
-

Yep, the user code {{ClassLoder}} can be retrieved from the {{Environment}} 
passed to {{AbstractStateBackend.restoreKeyedStateBackend()}}. A nicer solution 
would be to completely get rid of Java Serialization in the state serialization.

> KeyedStateBackend cannot restore user code classes
> --
>
> Key: FLINK-4603
> URL: https://issues.apache.org/jira/browse/FLINK-4603
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4604) Add support for standard deviation/variance

2016-09-09 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4604:
---

 Summary: Add support for standard deviation/variance
 Key: FLINK-4604
 URL: https://issues.apache.org/jira/browse/FLINK-4604
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
and document this rule. 

If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint

2016-09-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4592.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 8c0d62433a3f57d0753edb00e5c2bbc1adc467df.

> Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
> ---
>
> Key: FLINK-4592
> URL: https://issues.apache.org/jira/browse/FLINK-4592
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>  Labels: starter
> Fix For: 1.2.0
>
>
> It seems that the test is still non deterministic.
> {code}
> org.apache.flink.api.table.expressions.ScalarFunctionsTest
> testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest)
>   Time elapsed: 0.083 sec  <<< FAILURE!
> org.junit.ComparisonFailure: Wrong result for: 
> AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET 
> "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') 
> expected:<[tru]e> but was:<[fals]e>
>   at org.junit.Assert.assertEquals(Assert.java:115)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123)
>   at 
> scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-09 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4505:
-
Description: Implement {{TaskManagerRunner}} to construct related 
components ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
{{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
standalone mode.  (was: Implement {{TaskExecutorFactory}} that should be an 
abstract class with the helper methods to bring up the {{TaskManager}}. The 
factory can be implemented by some classes to start a {{TaskManager}} in 
different modes (testing, standalone, yarn).)

> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-09 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4505:
-
Summary: Implement TaskManagerRunner to construct related components for 
TaskManager  (was: Implement TaskManagerFactory to bring up TaskManager for 
different modes)

> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4177) CassandraConnectorTest.testCassandraCommitter causing unstable builds

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476803#comment-15476803
 ] 

ASF GitHub Bot commented on FLINK-4177:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2484

[FLINK-4177] Harden CassandraConnectorTest

This PR (hopefully) resolves the instability issues with the Cassandra 
connector tests.

Changelog:
* updated cassandra/driver versions
* the `cassandra.yaml` was cleaned up
 * removed several configuration values that used the default
 * sorted the remaining settings in alphabetical order
* the at-least-once sinks were modified to
 * properly log exceptions when close() is called
 * keep track of how many records were not acknowledged yet
* the tests were modified to
 * start the embedded cassandra instance in a separate process
   * and supply an array of performance related jvm arguments, taken from 
the cassandra repo
 * no longer truncate tables; instead every test uses a separate table
 * wait until a connection could be established to cassandra in a 
retry-loop instead of waiting for a fixed time
 * no longer run actual flink jobs
 * use increased timeouts
 * clean up temporary files

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink cass_tmp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2484


commit 64b125343839788530729a2119d1dc92e50e849a
Author: zentol 
Date:   2016-09-05T09:03:00Z

[FLINK-4177] Harden CassandraConnectorTest




> CassandraConnectorTest.testCassandraCommitter causing unstable builds
> -
>
> Key: FLINK-4177
> URL: https://issues.apache.org/jira/browse/FLINK-4177
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>  Labels: test-stability
>
> This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true 
> failed with
> {code}
> 07/08/2016 09:59:12   Job execution switched to status FINISHED.
> Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
> testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest)
>   Time elapsed: 9.057 sec  <<< ERROR!
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency LOCAL_SERIAL (1 replica were required but 
> only 0 acknowledged the write)
>   at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73)
>   at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26)
>   at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
>   at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
>   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
>   at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
>   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103)
>   at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: 
> Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica 
> were required but only 0 acknowledged the write)
>   at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100)
>   at 
> com.datastax.driver.core.Responses$Error.asException(Responses.java:122)
>   at 
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
>   at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
>   at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>   at 
> 

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

2016-09-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2484

[FLINK-4177] Harden CassandraConnectorTest

This PR (hopefully) resolves the instability issues with the Cassandra 
connector tests.

Changelog:
* updated cassandra/driver versions
* the `cassandra.yaml` was cleaned up
 * removed several configuration values that used the default
 * sorted the remaining settings in alphabetical order
* the at-least-once sinks were modified to
 * properly log exceptions when close() is called
 * keep track of how many records were not acknowledged yet
* the tests were modified to
 * start the embedded cassandra instance in a separate process
   * and supply an array of performance related jvm arguments, taken from 
the cassandra repo
 * no longer truncate tables; instead every test uses a separate table
 * wait until a connection could be established to cassandra in a 
retry-loop instead of waiting for a fixed time
 * no longer run actual flink jobs
 * use increased timeouts
 * clean up temporary files

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink cass_tmp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2484


commit 64b125343839788530729a2119d1dc92e50e849a
Author: zentol 
Date:   2016-09-05T09:03:00Z

[FLINK-4177] Harden CassandraConnectorTest




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint

2016-09-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-4592:
---

Assignee: Timo Walther

> Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
> ---
>
> Key: FLINK-4592
> URL: https://issues.apache.org/jira/browse/FLINK-4592
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>  Labels: starter
>
> It seems that the test is still non deterministic.
> {code}
> org.apache.flink.api.table.expressions.ScalarFunctionsTest
> testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest)
>   Time elapsed: 0.083 sec  <<< FAILURE!
> org.junit.ComparisonFailure: Wrong result for: 
> AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET 
> "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') 
> expected:<[tru]e> but was:<[fals]e>
>   at org.junit.Assert.assertEquals(Assert.java:115)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123)
>   at 
> scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2483: [FLINK-4601] Check for empty string properly

2016-09-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4601) Check for empty string properly

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476725#comment-15476725
 ] 

ASF GitHub Bot commented on FLINK-4601:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2483


> Check for empty string properly
> ---
>
> Key: FLINK-4601
> URL: https://issues.apache.org/jira/browse/FLINK-4601
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
> Fix For: 1.2.0
>
>
> UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for 
> empty string. We should use isEmpty() instead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4601) Check for empty string properly

2016-09-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4601.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in e92f91aeba3c9e8da5a5ff9efda342cb19da928c.

> Check for empty string properly
> ---
>
> Key: FLINK-4601
> URL: https://issues.apache.org/jira/browse/FLINK-4601
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
> Fix For: 1.2.0
>
>
> UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for 
> empty string. We should use isEmpty() instead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4601) Check for empty string properly

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476719#comment-15476719
 ] 

ASF GitHub Bot commented on FLINK-4601:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2483
  
Thanks for the fix. Will merge...


> Check for empty string properly
> ---
>
> Key: FLINK-4601
> URL: https://issues.apache.org/jira/browse/FLINK-4601
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for 
> empty string. We should use isEmpty() instead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2483: [FLINK-4601] Check for empty string properly

2016-09-09 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2483
  
Thanks for the fix. Will merge...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4603:


 Summary: KeyedStateBackend cannot restore user code classes
 Key: FLINK-4603
 URL: https://issues.apache.org/jira/browse/FLINK-4603
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Priority: Blocker
 Fix For: 1.2.0


A user reported that he cannot restore keyed state which contains user code 
classes. I suspect that we don't use the user code class loader to deserialize 
the state.

The solution seems to be to forward the user code class loader to the 
{{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476495#comment-15476495
 ] 

Fabian Hueske commented on FLINK-4599:
--

I would leave that up to you. If you come up with a good implementation to 
obtain the physical plan, that would be great. 
But we can also start with the Calcite AST and do the physical plan in a 
separate issue. Whatever you prefer :-) 

> Add 'explain()' also to StreamTableEnvironment
> --
>
> Key: FLINK-4599
> URL: https://issues.apache.org/jira/browse/FLINK-4599
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>  Labels: starter
>
> Currenlty, only the BatchTableEnvironment supports the {{explain}} command 
> for tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476482#comment-15476482
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78154021
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   return new GenTypeComparatorProxy<>(type, fullClassName, code, 
comparators, serializer);
+   }
+
+
+   private void generateCode(String className) {
+   String typeName = type.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < comparators.length; ++i) {
+   members.append(String.format("final TypeComparator 
f%d;\n", i));
--- End diff --

same specialization here as in `PojoSerializerGenerator`


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-09 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78154021
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   return new GenTypeComparatorProxy<>(type, fullClassName, code, 
comparators, serializer);
+   }
+
+
+   private void generateCode(String className) {
+   String typeName = type.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < comparators.length; ++i) {
+   members.append(String.format("final TypeComparator 
f%d;\n", i));
--- End diff --

same specialization here as in `PojoSerializerGenerator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >