[jira] [Commented] (FLINK-7017) Remove netty usages in flink-tests

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066016#comment-16066016
 ] 

ASF GitHub Bot commented on FLINK-7017:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4196#discussion_r124464897
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 ---
@@ -83,16 +78,16 @@ public static void initialize() throws Exception {
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
-   
+
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
-   
+
config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-   
+
--- End diff --

this is the removal of leading tabs that is required by the checkstyle 
active in `flink-runtime-web`.


> Remove netty usages in flink-tests
> --
>
> Key: FLINK-7017
> URL: https://issues.apache.org/jira/browse/FLINK-7017
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4196: [FLINK-7017] Remove netty usages in flink-tests

2017-06-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4196#discussion_r124464897
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 ---
@@ -83,16 +78,16 @@ public static void initialize() throws Exception {
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
-   
+
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
-   
+
config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-   
+
--- End diff --

this is the removal of leading tabs that is required by the checkstyle 
active in `flink-runtime-web`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6665) Pass a ScheduledExecutorService to the RestartStrategy

2017-06-27 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-6665:


Assignee: Fang Yong

> Pass a ScheduledExecutorService to the RestartStrategy
> --
>
> Key: FLINK-6665
> URL: https://issues.apache.org/jira/browse/FLINK-6665
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently, the {{RestartStrategy}} is called when the {{ExecutionGraph}} 
> should be restarted.
> To facilitate delays before restarting, the strategy simply sleeps, blocking 
> the thread that runs the ExecutionGraph's recovery method.
> I suggest to pass {{ScheduledExecutorService}}) to the {{RestartStrategy}} 
> and let it schedule the restart call that way, avoiding any sleeps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-27 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7025:
--

 Summary: Using NullByteKeySelector for Unbounded ProcTime 
NonPartitioned Over
 Key: FLINK-7025
 URL: https://issues.apache.org/jira/browse/FLINK-7025
 Project: Flink
  Issue Type: Bug
Reporter: sunjincheng
Assignee: sunjincheng


Currently we added `Cleanup State` feature. But It not work well if we enabled 
the stateCleaning on Unbounded ProcTime NonPartitioned Over window, Because in 
`ProcessFunctionWithCleanupState` we has using the keyed state.

So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` to 
`partitioned Over` by using NullByteKeySelector. OR created a 
`NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065960#comment-16065960
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124457316
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test(expected = classOf[NullPointerException])
+  def testReduceDeterministicUDF(): Unit = {
--- End diff --

The exception isn't caused by Flink, to check an unexpected exception is 
not a good way. I suggest to mark this test as `@Ignore` and add a TODO comment 
on the top of the test. After CALCITE-1860 is fixed, we can reopen this test or 
check&remove this test.


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065961#comment-16065961
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124457542
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test(expected = classOf[NullPointerException])
+  def testReduceDeterministicUDF(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+// if isDeterministic = true, will cause a Calcite NPE, which will be 
fixed in [CALCITE-1860]
+val result = table
+  .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+  .where("d.isNull")
+  .select('a, 'b, 'c)
+
+val expected: String = ""
--- End diff --

We should set the expected value here to make sure the test can pass once 
we reopen it.


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124457542
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test(expected = classOf[NullPointerException])
+  def testReduceDeterministicUDF(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+// if isDeterministic = true, will cause a Calcite NPE, which will be 
fixed in [CALCITE-1860]
+val result = table
+  .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+  .where("d.isNull")
+  .select('a, 'b, 'c)
+
+val expected: String = ""
--- End diff --

We should set the expected value here to make sure the test can pass once 
we reopen it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124457316
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  @Test(expected = classOf[NullPointerException])
+  def testReduceDeterministicUDF(): Unit = {
--- End diff --

The exception isn't caused by Flink, to check an unexpected exception is 
not a good way. I suggest to mark this test as `@Ignore` and add a TODO comment 
on the top of the test. After CALCITE-1860 is fixed, we can reopen this test or 
check&remove this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065956#comment-16065956
 ] 

ASF GitHub Bot commented on FLINK-6925:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124456032
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
--- End diff --

Please use `StringBuilder` instead of `StringBuffer`.  `StringBuffer` is a 
thread-safe with poorer performance. 

From StringBuffer JavaDoc:

>As of release JDK 5, this class has been supplemented with an equivalent 
class designed for use by a single thread, StringBuilder. The StringBuilder 
class should generally be used in preference to this one, as it supports all of 
the same operations but it is faster, as it performs no synchronization.







> Add CONCAT/CONCAT_WS supported in SQL
> -
>
> Key: FLINK-6925
> URL: https://issues.apache.org/jira/browse/FLINK-6925
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> CONCAT(str1,str2,...)Returns the string that results from concatenating the 
> arguments. May have one or more arguments. If all arguments are nonbinary 
> strings, the result is a nonbinary string. If the arguments include any 
> binary strings, the result is a binary string. A numeric argument is 
> converted to its equivalent nonbinary string form.
> CONCAT() returns NULL if any argument is NULL.
> * Syntax:
> CONCAT(str1,str2,...) 
> * Arguments
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT('F', 'lin', 'k') -> 'Flink'
>   CONCAT('M', NULL, 'L') -> NULL
>   CONCAT(14.3) -> '14.3'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]
> CONCAT_WS() stands for Concatenate With Separator and is a special form of 
> CONCAT(). The first argument is the separator for the rest of the arguments. 
> The separator is added between the strings to be concatenated. The separator 
> can be a string, as can the rest of the arguments. If the separator is NULL, 
> the result is NULL.
> * Syntax:
> CONCAT_WS(separator,str1,str2,...)
> * Arguments
> ** separator -
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
> name,Last Name'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124452756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
--- End diff --

I would suggest to change the signature to `sep: String, strs: String*`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r123681972
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/scalarFunctions/ScalarFunctions.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.scalarFunctions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * All build-in scalar scalar functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
--- End diff --

I would like to declare the signature like this `(sep: String, str1: 
String, others: String*)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065954#comment-16065954
 ] 

ASF GitHub Bot commented on FLINK-6925:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124456117
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
+val separator = args(0)
+if (null == separator) {
+  return null
+}
+
+val sb = new JStringBuffer
--- End diff --

Please use `StringBuilder` instead of `StringBuffer`.


> Add CONCAT/CONCAT_WS supported in SQL
> -
>
> Key: FLINK-6925
> URL: https://issues.apache.org/jira/browse/FLINK-6925
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> CONCAT(str1,str2,...)Returns the string that results from concatenating the 
> arguments. May have one or more arguments. If all arguments are nonbinary 
> strings, the result is a nonbinary string. If the arguments include any 
> binary strings, the result is a binary string. A numeric argument is 
> converted to its equivalent nonbinary string form.
> CONCAT() returns NULL if any argument is NULL.
> * Syntax:
> CONCAT(str1,str2,...) 
> * Arguments
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT('F', 'lin', 'k') -> 'Flink'
>   CONCAT('M', NULL, 'L') -> NULL
>   CONCAT(14.3) -> '14.3'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]
> CONCAT_WS() stands for Concatenate With Separator and is a special form of 
> CONCAT(). The first argument is the separator for the rest of the arguments. 
> The separator is added between the strings to be concatenated. The separator 
> can be a string, as can the rest of the arguments. If the separator is NULL, 
> the result is NULL.
> * Syntax:
> CONCAT_WS(separator,str1,str2,...)
> * Arguments
> ** separator -
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
> name,Last Name'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065957#comment-16065957
 ] 

ASF GitHub Bot commented on FLINK-6925:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r123681972
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/scalarFunctions/ScalarFunctions.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.scalarFunctions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * All build-in scalar scalar functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
--- End diff --

I would like to declare the signature like this `(sep: String, str1: 
String, others: String*)`


> Add CONCAT/CONCAT_WS supported in SQL
> -
>
> Key: FLINK-6925
> URL: https://issues.apache.org/jira/browse/FLINK-6925
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> CONCAT(str1,str2,...)Returns the string that results from concatenating the 
> arguments. May have one or more arguments. If all arguments are nonbinary 
> strings, the result is a nonbinary string. If the arguments include any 
> binary strings, the result is a binary string. A numeric argument is 
> converted to its equivalent nonbinary string form.
> CONCAT() returns NULL if any argument is NULL.
> * Syntax:
> CONCAT(str1,str2,...) 
> * Arguments
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT('F', 'lin', 'k') -> 'Flink'
>   CONCAT('M', NULL, 'L') -> NULL
>   CONCAT(14.3) -> '14.3'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]
> CONCAT_WS() stands for Concatenate With Separator and is a special form of 
> CONCAT(). The first argument is the separator for the rest of the arguments. 
> The separator is added between the strings to be concatenated. The separator 
> can be a string, as can the rest of the arguments. If the separator is NULL, 
> the result is NULL.
> * Syntax:
> CONCAT_WS(separator,str1,str2,...)
> * Arguments
> ** separator -
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
> name,Last Name'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065955#comment-16065955
 ] 

ASF GitHub Bot commented on FLINK-6925:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124452756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
--- End diff --

I would suggest to change the signature to `sep: String, strs: String*`


> Add CONCAT/CONCAT_WS supported in SQL
> -
>
> Key: FLINK-6925
> URL: https://issues.apache.org/jira/browse/FLINK-6925
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> CONCAT(str1,str2,...)Returns the string that results from concatenating the 
> arguments. May have one or more arguments. If all arguments are nonbinary 
> strings, the result is a nonbinary string. If the arguments include any 
> binary strings, the result is a binary string. A numeric argument is 
> converted to its equivalent nonbinary string form.
> CONCAT() returns NULL if any argument is NULL.
> * Syntax:
> CONCAT(str1,str2,...) 
> * Arguments
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT('F', 'lin', 'k') -> 'Flink'
>   CONCAT('M', NULL, 'L') -> NULL
>   CONCAT(14.3) -> '14.3'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]
> CONCAT_WS() stands for Concatenate With Separator and is a special form of 
> CONCAT(). The first argument is the separator for the rest of the arguments. 
> The separator is added between the strings to be concatenated. The separator 
> can be a string, as can the rest of the arguments. If the separator is NULL, 
> the result is NULL.
> * Syntax:
> CONCAT_WS(separator,str1,str2,...)
> * Arguments
> ** separator -
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
> name,Last Name'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124456117
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
+var i = 0
+while (i < args.length) {
+  if (args(i) == null) {
+return null
+  }
+  sb.append(args(i))
+  i += 1
+}
+sb.toString
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments and 
separator.
+* Returns NULL If the separator is NULL.
+*
+* Note: CONCAT_WS() does not skip empty strings. However, it does skip 
any NULL values after
+* the separator argument.
+*
+* @param args The first element of argument is the separator for the 
rest of the arguments.
+*/
+  @varargs
+  def concat_ws(args: String*): String = {
+val separator = args(0)
+if (null == separator) {
+  return null
+}
+
+val sb = new JStringBuffer
--- End diff --

Please use `StringBuilder` instead of `StringBuffer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4138#discussion_r124456032
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.{StringBuffer => JStringBuffer}
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+Math.pow(a, b.doubleValue())
+  }
+
+  /**
+* Returns the string that results from concatenating the arguments.
+* Returns NULL if any argument is NULL.
+*/
+  @varargs
+  def concat(args: String*): String = {
+val sb = new JStringBuffer
--- End diff --

Please use `StringBuilder` instead of `StringBuffer`.  `StringBuffer` is a 
thread-safe with poorer performance. 

From StringBuffer JavaDoc:

>As of release JDK 5, this class has been supplemented with an equivalent 
class designed for use by a single thread, StringBuilder. The StringBuilder 
class should generally be used in preference to this one, as it supports all of 
the same operations but it is faster, as it performs no synchronization.







---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065948#comment-16065948
 ] 

ASF GitHub Bot commented on FLINK-7008:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124455303
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
1858562682635302605L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).notFollowedBy("not").where(new IterativeCondition() {
+   private static final long serialVersionUID = 
-6085237016591726715L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedByAny("middle").where(new 
IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("b");
+   }
+   
}).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new 
IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("d");
+   }
+   }).followedBy("end").where(new IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("e");
+   }
+   }).within(Time.milliseconds(10));
+
+   NFACompiler.NFAFactory nfaFactory = 
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
+   NFA nfa = nfaFactory.createNFA();
+   nfa.process(new Event(1, "b", 1.0), 1L);
+   assertFalse(nfa.isNFAChanged());
+
+   nfa.nfaChanged = false;
--- End diff --

@dawidwys thanks a lot for the review. Have added more comments.


> Update NFA state only when the NFA changes.
> ---
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Kostas Kloudas
>Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...

2017-06-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4195#discussion_r124455303
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
@@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception {
}
}
 
+   @Test
+   public void testNFAChange() {
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
1858562682635302605L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).notFollowedBy("not").where(new IterativeCondition() {
+   private static final long serialVersionUID = 
-6085237016591726715L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedByAny("middle").where(new 
IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("b");
+   }
+   
}).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new 
IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("d");
+   }
+   }).followedBy("end").where(new IterativeCondition() {
+   private static final long serialVersionUID = 
8061969839441121955L;
+
+   @Override
+   public boolean filter(Event value, Context ctx) 
throws Exception {
+   return value.getName().equals("e");
+   }
+   }).within(Time.milliseconds(10));
+
+   NFACompiler.NFAFactory nfaFactory = 
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
+   NFA nfa = nfaFactory.createNFA();
+   nfa.process(new Event(1, "b", 1.0), 1L);
+   assertFalse(nfa.isNFAChanged());
+
+   nfa.nfaChanged = false;
--- End diff --

@dawidwys thanks a lot for the review. Have added more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065941#comment-16065941
 ] 

ASF GitHub Bot commented on FLINK-6407:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4205
  
cc @zentol 


> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Assignee: mingleizhang
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2

2017-06-27 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4205
  
cc @zentol 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065937#comment-16065937
 ] 

ASF GitHub Bot commented on FLINK-6407:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4205

[FLINK-6407] [build] Upgrade AVRO to 1.8.2

Upgrade to the last maintenance releases of **AVRO** 1.8.2

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6407

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4205


commit b9790a643256bc95ba9612ed65f948719fbb08e2
Author: zhangminglei 
Date:   2017-06-28T03:41:52Z

[FLINK-6407] [build] Upgrade AVRO to 1.8.2




> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Assignee: mingleizhang
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2

2017-06-27 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4205

[FLINK-6407] [build] Upgrade AVRO to 1.8.2

Upgrade to the last maintenance releases of **AVRO** 1.8.2

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6407

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4205


commit b9790a643256bc95ba9612ed65f948719fbb08e2
Author: zhangminglei 
Date:   2017-06-28T03:41:52Z

[FLINK-6407] [build] Upgrade AVRO to 1.8.2




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7024) Add supported for selecting window proctime/rowtime on row-based Tumble/Slide window

2017-06-27 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7024:
--

 Summary: Add supported for selecting window proctime/rowtime  on 
row-based Tumble/Slide window 
 Key: FLINK-7024
 URL: https://issues.apache.org/jira/browse/FLINK-7024
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng
Assignee: sunjincheng


We get validate exception,when selecting window.proctime/rowtime on row-based 
group window.
{code}
 table
  .window(Tumble over 2.rows on 'proctime as 'w)
  .groupBy('w, 'string)
  .select('string, countFun('string) as 'cnt, 'w.rowtime as 'proctime)
  .window(Over partitionBy 'string orderBy 'proctime preceding 
UNBOUNDED_RANGE following CURRENT_RANGE as 'w2)
  .select('string, 'cnt.sum over 'w2 as 'cnt)
{code}
Exception:
{code}
org.apache.flink.table.api.ValidationException: Window start and Window end 
cannot be selected for a row-count Tumbling window.

at 
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
at 
org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:660)
{code}
We should add window.proctime/rowtime check in `validate ` method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6522) Add ZooKeeper cleanup logic to ZooKeeperHaServices

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065924#comment-16065924
 ] 

ASF GitHub Bot commented on FLINK-6522:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4204
  
Hi @tillrohrmann , I have created this PR for issue 
[FLINK-6522.](https://issues.apache.org/jira/browse/FLINK-6522) Could you 
please have a look when you're free, thanks


> Add ZooKeeper cleanup logic to ZooKeeperHaServices
> --
>
> Key: FLINK-6522
> URL: https://issues.apache.org/jira/browse/FLINK-6522
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>
> The {{ZooKeeperHaServices}} provide a {{CuratorFramework}} client to access 
> ZooKeeper data. Consequently, all data (also for different job) are stored 
> under the same root node. When 
> {{HighAvailabilityServices#closeAndCleanupAllData}} is called, then this data 
> should be cleaned up. This cleanup logic is currently missing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4204: [FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaSe...

2017-06-27 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4204
  
Hi @tillrohrmann , I have created this PR for issue 
[FLINK-6522.](https://issues.apache.org/jira/browse/FLINK-6522) Could you 
please have a look when you're free, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6522) Add ZooKeeper cleanup logic to ZooKeeperHaServices

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065922#comment-16065922
 ] 

ASF GitHub Bot commented on FLINK-6522:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4204

[FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaServices

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6522

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4204.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4204


commit b816aa3a2a3d4d1377caccdcbeba91073e89af75
Author: zjureel 
Date:   2017-06-28T02:12:27Z

clean up all data which is stored by RetrievableStateStorageHelper

commit 03a2f39e051594cb07017ef2613eb7873b56d118
Author: zjureel 
Date:   2017-06-28T04:42:30Z

clean up all data which is stored in zookeeper




> Add ZooKeeper cleanup logic to ZooKeeperHaServices
> --
>
> Key: FLINK-6522
> URL: https://issues.apache.org/jira/browse/FLINK-6522
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>
> The {{ZooKeeperHaServices}} provide a {{CuratorFramework}} client to access 
> ZooKeeper data. Consequently, all data (also for different job) are stored 
> under the same root node. When 
> {{HighAvailabilityServices#closeAndCleanupAllData}} is called, then this data 
> should be cleaned up. This cleanup logic is currently missing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4204: [FLINK-6522] Add ZooKeeper cleanup logic to ZooKee...

2017-06-27 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4204

[FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaServices

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6522

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4204.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4204


commit b816aa3a2a3d4d1377caccdcbeba91073e89af75
Author: zjureel 
Date:   2017-06-28T02:12:27Z

clean up all data which is stored by RetrievableStateStorageHelper

commit 03a2f39e051594cb07017ef2613eb7873b56d118
Author: zjureel 
Date:   2017-06-28T04:42:30Z

clean up all data which is stored in zookeeper




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065911#comment-16065911
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske , I like the `firstResultTimeOffset`, a good design !  👍 

Regarding to the watermark and timestamp, it makes sense to me. And a 
simple approach come to my mind : assign a new 
`AssignerWithPunctuatedWatermarks`  after window aggregate. This will create an 
operator which ignores the upstream watermarks and assign new watermarks depend 
on the watermark function.

The new watermark function can simply take the element's timestamp as the 
watermark.

```scala
class TimestampAndWatermark[T] extends AssignerWithPunctuatedWatermarks[T] {
   
override def checkAndGetNextWatermark(lastElement: T, 
extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp)
}

override def extractTimestamp(element: T, originalTimestamp: Long): 
Long = originalTimestamp
  }
```




> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...

2017-06-27 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4183
  
Hi @fhueske , I like the `firstResultTimeOffset`, a good design !  👍 

Regarding to the watermark and timestamp, it makes sense to me. And a 
simple approach come to my mind : assign a new 
`AssignerWithPunctuatedWatermarks`  after window aggregate. This will create an 
operator which ignores the upstream watermarks and assign new watermarks depend 
on the watermark function.

The new watermark function can simply take the element's timestamp as the 
watermark.

```scala
class TimestampAndWatermark[T] extends AssignerWithPunctuatedWatermarks[T] {
   
override def checkAndGetNextWatermark(lastElement: T, 
extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp)
}

override def extractTimestamp(element: T, originalTimestamp: Long): 
Long = originalTimestamp
  }
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7023) Remaining types for Gelly ValueArrays

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065883#comment-16065883
 ] 

ASF GitHub Bot commented on FLINK-7023:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/4203

[FLINK-7023] [gelly] Remaining types for Gelly ValueArrays

Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with 
the existing implementations of Int/Long/Null/StringValueArray this covers all 
10 CopyableValue types.

Note: the best way to review these files is to diff against the existing 
`IntValueArray` and `LongValueArray` implementations as the deltas are very 
small.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
7023_remaining_tyeps_for_gelly_valuearrays

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4203


commit 1281058d1c4d1dd1debb0d3502d8d5c68ddbd01b
Author: Greg Hogan 
Date:   2017-06-28T02:54:21Z

[FLINK-7023] [gelly] Remaining types for Gelly ValueArrays

Add implementations of Byte/Char/Double/Float/ShortValueArray. Along
with the existing implementations of Int/Long/Null/StringValueArray this
covers all 10 CopyableValue types.




> Remaining types for Gelly ValueArrays
> -
>
> Key: FLINK-7023
> URL: https://issues.apache.org/jira/browse/FLINK-7023
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.4.0
>
>
> Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the 
> existing implementations of Int/Long/Null/StringValueArray this covers all 10 
> CopyableValue types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4203: [FLINK-7023] [gelly] Remaining types for Gelly Val...

2017-06-27 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/4203

[FLINK-7023] [gelly] Remaining types for Gelly ValueArrays

Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with 
the existing implementations of Int/Long/Null/StringValueArray this covers all 
10 CopyableValue types.

Note: the best way to review these files is to diff against the existing 
`IntValueArray` and `LongValueArray` implementations as the deltas are very 
small.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
7023_remaining_tyeps_for_gelly_valuearrays

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4203


commit 1281058d1c4d1dd1debb0d3502d8d5c68ddbd01b
Author: Greg Hogan 
Date:   2017-06-28T02:54:21Z

[FLINK-7023] [gelly] Remaining types for Gelly ValueArrays

Add implementations of Byte/Char/Double/Float/ShortValueArray. Along
with the existing implementations of Int/Long/Null/StringValueArray this
covers all 10 CopyableValue types.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124443977
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

@Xpray  @sunjincheng121 I mean we can remove this test, and add a test in 
`ExpressionReductionTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065866#comment-16065866
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124443977
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

@Xpray  @sunjincheng121 I mean we can remove this test, and add a test in 
`ExpressionReductionTest`.


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065865#comment-16065865
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124443845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
--- End diff --

`isDeterministic` is the base method of `SqlOperator` which is the base 
class of `AggSqlFunction`, `ScalarSqlFunction`, `TableSqlFunction`. So I think 
maybe it's not necessary to add an abstract class. 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124443845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
--- End diff --

`isDeterministic` is the base method of `SqlOperator` which is the base 
class of `AggSqlFunction`, `ScalarSqlFunction`, `TableSqlFunction`. So I think 
maybe it's not necessary to add an abstract class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7023) Remaining types for Gelly ValueArrays

2017-06-27 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-7023:
-

 Summary: Remaining types for Gelly ValueArrays
 Key: FLINK-7023
 URL: https://issues.apache.org/jira/browse/FLINK-7023
 Project: Flink
  Issue Type: Sub-task
  Components: Gelly
Affects Versions: 1.4.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial
 Fix For: 1.4.0


Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the 
existing implementations of Int/Long/Null/StringValueArray this covers all 10 
CopyableValue types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065854#comment-16065854
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user Xpray commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124442541
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

thanks for reviewing, I'll add more test cases in `ExpressionReductionTest `


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread Xpray
Github user Xpray commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124442541
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

thanks for reviewing, I'll add more test cases in `ExpressionReductionTest `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065834#comment-16065834
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124439791
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

Yes, agree with @wuchong , Add a test in `ExpressionReductionTest` is 
correct. and I think it's better to add a exception test case.


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065836#comment-16065836
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124439457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 ---
@@ -40,6 +40,12 @@ abstract class UserDefinedFunction extends Serializable {
   @throws(classOf[Exception])
   def close(): Unit = {}
 
+  /**
+* @return true iff a call to this function is guaranteed to always 
return
+* the same result given the same parameters; true is assumed 
by default
+*/
+  def isDeterministic: Boolean = true
--- End diff --

I suggest explain when user need to overwrite the method, what the impact 
if not overwrite the method. 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065835#comment-16065835
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124440259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
--- End diff --

Can we add an abstract class for `AggSqlFunction`, `ScalarSqlFunction`,and 
`TableSqlFunction`. What do you think? @wuchong @Xpray 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124439791
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

Yes, agree with @wuchong , Add a test in `ExpressionReductionTest` is 
correct. and I think it's better to add a exception test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124439457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 ---
@@ -40,6 +40,12 @@ abstract class UserDefinedFunction extends Serializable {
   @throws(classOf[Exception])
   def close(): Unit = {}
 
+  /**
+* @return true iff a call to this function is guaranteed to always 
return
+* the same result given the same parameters; true is assumed 
by default
+*/
+  def isDeterministic: Boolean = true
--- End diff --

I suggest explain when user need to overwrite the method, what the impact 
if not overwrite the method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124440259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
--- End diff --

Can we add an abstract class for `AggSqlFunction`, `ScalarSqlFunction`,and 
`TableSqlFunction`. What do you think? @wuchong @Xpray 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

2017-06-27 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
rebase the code and @dawidwys @kl0u could you help to take a look at this 
PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065826#comment-16065826
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
rebase the code and @dawidwys @kl0u could you help to take a look at this 
PR?


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner

2017-06-27 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065817#comment-16065817
 ] 

Xingcan Cui commented on FLINK-6936:


[~aljoscha], thanks for your attention. You are right, that the state 
management seems to be the bottleneck since the {{KeyedState}} can not be used 
here and the {{OperatorState}} is still incomplete (it only supports 
{{ListState}}). In my view, the main problem lies in the current state 
mechanism (whether keyed or unkeyed) only supports "symmetrical" states, i.e., 
it is impossible to assign a designated portion of the global state to a 
dedicated instance. 

For the moment, I have implemented a simple record-to-window join according to 
the design document ([Inner Join in Flink|https://goo.gl/4AdR7h]). The body 
just looks like that.
{code:java}
orderA.connect(orderB).process(new JoinMerge[Order, Order]())
  .multicast(new JoinPartitioner[Order, Order])
  .process(new CommonStreamJoin[Order, Order, Order2](
new JoinFunction[Order, Order, Order2] {
  override def join(left: Order, right: Order): Order2 = {
Order2(left.user, right.user, left.product, right.product, 
left.amount, right.amount);
  }
}, 6, 1000))
{code}
The key idea is to randomly split the left stream and duplicate the right 
stream to all downstream instances.
Correspondingly, I use {{OperatorState}} ({{ListState}}) to store a portion of 
the cached left stream and a full copy of the cached right stream in each 
{{CommonStreamJoin}} instance. It seems to be quite costly and will not work 
correctly under some circumstances (e.g., increase the parallelism, state 
inconsistency occurs), but I cannot imagine other implementations unless there 
is a better state management mechanism provided. What do you think?



Codes for the three main classes {{JoinMerge}}, {{JoinPartitioner}} and 
{{CommonStreamJoin}} (it does not remove expired data now) are as follows.
{code:java}
class JoinMerge[L, R] extends CoProcessFunction[L, R, Either[L, R]] {
override def processElement1(value: L, ctx: CoProcessFunction[L, R, 
Either[L, R]]#Context,
 out: Collector[Either[L, R]]): Unit = {
  out.collect(Left(value))
}

override def processElement2(value: R, ctx: CoProcessFunction[L, R, 
Either[L, R]]#Context,
 out: Collector[Either[L, R]]): Unit = {
  out.collect(Right(value))
}
  }
{code}
{code:java}
  class JoinPartitioner[L, R] extends MultiPartitioner[Either[L, R]] {
var targets: Array[Int] = null
override def partition(record: Either[L, R], numPartitions: Int): 
Array[Int] = {
  if (record.isLeft) {
if (!(null != targets && targets.length == numPartitions)) {
  targets = Array.range(0, numPartitions)
}
return targets
  } else {
Array(Random.nextInt(numPartitions))
  }
}
  }
{code}
{code:java}
  class CommonStreamJoin[L, R, O](val joinFunction: JoinFunction[L, R, O], val 
cacheTime: Long, val rOffset: Long)
extends ProcessFunction[Either[L, R], O] with CheckpointedFunction {

val leftCache = new TreeMap[Long, JList[L]]
val rightCache = new TreeMap[Long, JList[R]]

var leftState: ListState[TMap[Long, JList[L]]] = null
var rightState: ListState[MEntry[Long, JList[R]]] = null

val leftDescriptor = new ListStateDescriptor[TMap[Long, 
JList[L]]]("leftCache",
  TypeInformation.of(new TypeHint[TMap[Long, JList[L]]]() {}))
val rightDescriptor = new ListStateDescriptor[MEntry[Long, 
JList[R]]]("rightCache",
  TypeInformation.of(new TypeHint[MEntry[Long, JList[R]]]() {}))

var result: O = _

override def processElement(value: Either[L, R],
ctx: ProcessFunction[Either[L, R], O]#Context,
out: Collector[O]): Unit = {
  //TODO this should be replaced with timestamps contained in records
  val time = ctx.timerService().currentProcessingTime()
  if (value.isRight) {
val right = value.right.get
if (!rightCache.containsKey(time)) {
  rightCache.put(time, new util.LinkedList[R]())
}
rightCache.get(time).add(right)
for (leftList <- leftCache.values()) {
  for (left <- leftList) {
result = joinFunction.join(left, right)
if (null != result) {
  out.collect(result)
}
  }
}
  } else {
val left = value.left.get
if (!leftCache.containsKey(time)) {
  leftCache.put(time, new util.LinkedList[L]())
}
leftCache.get(time).add(left)
for (rightList <- rightCache.values()) {
  for (right <- rightList) {
result = joinFunction.join(left, right)
if (null != result) {
  out.collect(result)
}

[jira] [Assigned] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-27 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6407:
---

Assignee: mingleizhang

> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Assignee: mingleizhang
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065798#comment-16065798
 ] 

ASF GitHub Bot commented on FLINK-7014:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124436213
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

I think an IT case can't cover this change. Whether the UDF is 
deterministic or not, the result should be same. 


> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...

2017-06-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4200#discussion_r124436213
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+
+  @Test
+  def testScalarFunctionDeterministic(): Unit = {
--- End diff --

I think an IT case can't cover this change. Whether the UDF is 
deterministic or not, the result should be same. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x

2017-06-27 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065792#comment-16065792
 ] 

mingleizhang commented on FLINK-6407:
-

Avro has a release version of 1.8.2 now. I would upgrade it to 1.8.2 if there 
is no problem.

> Upgrade AVRO dependency version to 1.8.x
> 
>
> Key: FLINK-6407
> URL: https://issues.apache.org/jira/browse/FLINK-6407
> Project: Flink
>  Issue Type: Wish
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.1
>Reporter: Miguel
>Priority: Minor
>
> Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is 
> limited to String type keys). It was solved in Avro 1.8.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6422) Unreachable code in FileInputFormat#createInputSplits

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065744#comment-16065744
 ] 

ASF GitHub Bot commented on FLINK-6422:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4202
  
Thanks  @tedyu for reporting this. cc  @StephanEwen @tedyu Could both you 
take a look ? I checked the first version you designed. 


> Unreachable code in FileInputFormat#createInputSplits
> -
>
> Key: FLINK-6422
> URL: https://issues.apache.org/jira/browse/FLINK-6422
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> Here is related code:
> {code}
> if (minNumSplits < 1) {
>   throw new IllegalArgumentException("Number of input splits has to be at 
> least 1.");
> }
> ...
> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : 
> (totalLength / minNumSplits +
>   (totalLength % minNumSplits == 0 ? 0 : 1));
> {code}
> minNumSplits wouldn't be less than 1 getting to the assignment of 
> maxSplitSize.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4202: [FLINK-6422] [core] Unreachable code in FileInputFormat#c...

2017-06-27 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4202
  
Thanks  @tedyu for reporting this. cc  @StephanEwen @tedyu Could both you 
take a look ? I checked the first version you designed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6422) Unreachable code in FileInputFormat#createInputSplits

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065741#comment-16065741
 ] 

ASF GitHub Bot commented on FLINK-6422:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4202

[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSp…

Fix Unreachable code in FileInputFormat#createInputSplits.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6422

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4202


commit 4cc3e55621372eaf4a9661939f52656aeef4eeef
Author: zhangminglei 
Date:   2017-06-28T00:45:12Z

[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits




> Unreachable code in FileInputFormat#createInputSplits
> -
>
> Key: FLINK-6422
> URL: https://issues.apache.org/jira/browse/FLINK-6422
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> Here is related code:
> {code}
> if (minNumSplits < 1) {
>   throw new IllegalArgumentException("Number of input splits has to be at 
> least 1.");
> }
> ...
> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : 
> (totalLength / minNumSplits +
>   (totalLength % minNumSplits == 0 ? 0 : 1));
> {code}
> minNumSplits wouldn't be less than 1 getting to the assignment of 
> maxSplitSize.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4202: [FLINK-6422] [core] Unreachable code in FileInputF...

2017-06-27 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4202

[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSp…

Fix Unreachable code in FileInputFormat#createInputSplits.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6422

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4202


commit 4cc3e55621372eaf4a9661939f52656aeef4eeef
Author: zhangminglei 
Date:   2017-06-28T00:45:12Z

[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6843) ClientConnectionTest fails on travis

2017-06-27 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065709#comment-16065709
 ] 

mingleizhang commented on FLINK-6843:
-

[~aljoscha] I guess caused this because of java version mismatch. And J2SE 8 
belongs to 52.  The error message to 
{code:java}
Unsupported major.minor version 52.0
{code}
 is because during compile time using a higher JDK and a lower JDK during 
runtime here is jdk7 belongs to 51. I dont know how Travis works, it might be 
travis compile the source code on a machine which use JDK8 and run .class file 
on a jdk7 machine.

> ClientConnectionTest fails on travis
> 
>
> Key: FLINK-6843
> URL: https://issues.apache.org/jira/browse/FLINK-6843
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.2
>
>
> jdk7, hadoop 2.4.1, scala 2.11
> {code}
> testJobManagerRetrievalWithHAServices(org.apache.flink.client.program.ClientConnectionTest)
>   Time elapsed: 0.013 sec  <<< ERROR!
> java.lang.UnsupportedClassVersionError: 
> org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices : 
> Unsupported major.minor version 52.0
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at 
> org.apache.flink.client.program.ClientConnectionTest.testJobManagerRetrievalWithHAServices(ClientConnectionTest.java:122)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7022) Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7022:
---

 Summary: Flink Job Manager Scheduler & Web Frontend out of sync 
when Zookeeper is unavailable on startup
 Key: FLINK-7022
 URL: https://issues.apache.org/jira/browse/FLINK-7022
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Job Manager web frontend is permanently unavailable if one or more 
Zookeeper nodes are unresolvable during startup. The job scheduler eventually 
recovers and assigns jobs to task managers, but the web frontend continues to 
respond with an HTTP 503 and the following message:
{noformat}Service temporarily unavailable due to an ongoing leader election. 
Please refresh.{noformat}

h2. Expected Behavior
Once Flink is able to interact with Zookeeper successfully, all aspects of the 
Job Manager (job scheduling & the web frontend) should be available.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.host, localhost
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.port, 8125
2017-06-27 15:38:47,162 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.interval, 10 SECONDS
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend, filesystem
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend.fs.checkpointdir, 
hdfs://hdfs:8020/flink/checkpoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.savepoints.dir, hdfs://hdfs:8020/flink/savepoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.mode, zookeeper
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.quorum, 
zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.storageDir, 
hdfs://hdfs:8020/flink/recovery
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.jobmanager.port, 6123
2017-06-27 15:38:47,257 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: blob.server.port, 41479
2017-06-27 15:38:47,357 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 'recovery.mode' instead 
of proper key 'high-availability'
2017-06-27 15:38:47,366 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager with high-availability
2017-06-27 15:38:47,366 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 
'recovery.jobmanager.port' instead of proper key 
'high-availability.jobmanager.port'
2017-06-27 15:38:47,452 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager on flink:6123 with execution mode CLUSTER
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, flink
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration  

[jira] [Commented] (FLINK-7017) Remove netty usages in flink-tests

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065675#comment-16065675
 ] 

ASF GitHub Bot commented on FLINK-7017:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4196#discussion_r124424384
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 ---
@@ -83,16 +78,16 @@ public static void initialize() throws Exception {
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
-   
+
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
-   
+
config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-   
+
--- End diff --

Sometimes, I also get this kinda stuff. But seems i didnt do anything 
relevant to this style.  


> Remove netty usages in flink-tests
> --
>
> Key: FLINK-7017
> URL: https://issues.apache.org/jira/browse/FLINK-7017
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4196: [FLINK-7017] Remove netty usages in flink-tests

2017-06-27 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4196#discussion_r124424384
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 ---
@@ -83,16 +78,16 @@ public static void initialize() throws Exception {
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
-   
+
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
-   
+
config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-   
+
--- End diff --

Sometimes, I also get this kinda stuff. But seems i didnt do anything 
relevant to this style.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7021:
---

 Summary: Flink Task Manager hangs on startup if one Zookeeper node 
is unresolvable
 Key: FLINK-7021
 URL: https://issues.apache.org/jira/browse/FLINK-7021
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
the Zookeeper connection string is unresolvable.

h2. Expected Behavior
Flink should retry name resolution & connection to Zookeeper nodes with 
exponential back-off.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:51,713 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Using configured hostname/address for TaskManager: 10.2.45.11
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor system at 10.2.45.11:6122.
2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2017-06-27 15:38:53,079 INFO  Remoting  
- Starting remoting
2017-06-27 15:38:53,573 INFO  Remoting  
- Remoting started; listening on addresses 
:[akka.tcp://flink@10.2.45.11:6122]
2017-06-27 15:38:53,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor
2017-06-27 15:38:53,660 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
[server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
(manual), number of client threads: 2 (manual), server connect backlog: 0 (use 
Netty's default), client connect timeout (sec): 120, send/receive buffer size 
(bytes): 0 (use Netty's default)]
2017-06-27 15:38:53,682 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 1 ms
2017-06-27 15:38:53,688 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file 
directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
2017-06-27 15:38:54,071 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 MB 
for network buffer pool (number of memory segments: 3095, bytes per segment: 
32768).
2017-06-27 15:38:54,564 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
network environment and its components.
2017-06-27 15:38:54,576 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
initialization (took 4 ms).
2017-06-27 15:38:54,677 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
2017-06-27 15:38:54,981 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
managed memory to 0.7 of the currently free heap space (612 MB), memory will be 
allocated lazily.
2017-06-27 15:38:55,050 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
files.
2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Configuring StatsDReporter with {interval=10 SECONDS, port=8125, 
host=localhost, class=org.apache.flink.metrics.statsd.StatsDReporter}.
2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter
- Configured StatsDReporter with {host:localhost, port:8125}
2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Periodically reporting metrics in intervals of 10 SECONDS for 
reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
2017-06-27 15:38:55,175 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-e4c5bcc5-7513-40d9-a665-0d33c80a36ba
2017-06-27 15:38:55,187 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses director

[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065620#comment-16065620
 ] 

ASF GitHub Bot commented on FLINK-6998:
---

Github user zhenzhongxu commented on the issue:

https://github.com/apache/flink/pull/4187
  
@tzulitai 

**Regarding the metric naming:**
Any suggestions on naming conventions for these flink specific metrics? How 
do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an 
example? I personally like hyphen seperators better than camel case for metric 
names. 
I'll not include the other proposed metric in this PR just for the sake of 
simplicity. I also have some opinions on "offset lag" metric, I think this 
particular metric is more useful when some external entity 
perform the monitoring (difference of committed offset vs log head), 
especially in failure situations.

**Regarding the implementation:**
Thanks for the feedback. I'll explore the more proper implementation 
suggested, I'll get back to you with a solution or question.


> Kafka connector needs to expose metrics for failed/successful offset commits 
> in the Kafka Consumer callback
> ---
>
> Key: FLINK-6998
> URL: https://issues.apache.org/jira/browse/FLINK-6998
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>
> Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in 
> KafkaConsumerThread class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

2017-06-27 Thread zhenzhongxu
Github user zhenzhongxu commented on the issue:

https://github.com/apache/flink/pull/4187
  
@tzulitai 

**Regarding the metric naming:**
Any suggestions on naming conventions for these flink specific metrics? How 
do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an 
example? I personally like hyphen seperators better than camel case for metric 
names. 
I'll not include the other proposed metric in this PR just for the sake of 
simplicity. I also have some opinions on "offset lag" metric, I think this 
particular metric is more useful when some external entity 
perform the monitoring (difference of committed offset vs log head), 
especially in failure situations.

**Regarding the implementation:**
Thanks for the feedback. I'll explore the more proper implementation 
suggested, I'll get back to you with a solution or question.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065597#comment-16065597
 ] 

ASF GitHub Bot commented on FLINK-6998:
---

Github user zhenzhongxu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4187#discussion_r124413756
  
--- Diff: docs/monitoring/metrics.md ---
@@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Connector:
+
+  
+
+  Scope
+  Metrics
+  Description
+
+  
+  
+
+  Slot/Consumer
+  kafkaCommitsSucceeded
+  Kafka offset commit success count if Kafka commit is turned 
on.
+
+
+   Slot/Consumer
+   kafkaCommitsFailed
+   Kafka offset commit failure count if Kafka commit is turned 
on.
--- End diff --

Sure, thanks for the feedback. Will update.a


> Kafka connector needs to expose metrics for failed/successful offset commits 
> in the Kafka Consumer callback
> ---
>
> Key: FLINK-6998
> URL: https://issues.apache.org/jira/browse/FLINK-6998
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>
> Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in 
> KafkaConsumerThread class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

2017-06-27 Thread zhenzhongxu
Github user zhenzhongxu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4187#discussion_r124413737
  
--- Diff: docs/monitoring/metrics.md ---
@@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Connector:
+
+  
+
+  Scope
+  Metrics
+  Description
+
+  
+  
+
+  Slot/Consumer
+  kafkaCommitsSucceeded
+  Kafka offset commit success count if Kafka commit is turned 
on.
--- End diff --

Sure, thanks for the feedback. Will update.a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065595#comment-16065595
 ] 

ASF GitHub Bot commented on FLINK-6998:
---

Github user zhenzhongxu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4187#discussion_r124413737
  
--- Diff: docs/monitoring/metrics.md ---
@@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Connector:
+
+  
+
+  Scope
+  Metrics
+  Description
+
+  
+  
+
+  Slot/Consumer
+  kafkaCommitsSucceeded
+  Kafka offset commit success count if Kafka commit is turned 
on.
--- End diff --

Sure, thanks for the feedback. Will update.a


> Kafka connector needs to expose metrics for failed/successful offset commits 
> in the Kafka Consumer callback
> ---
>
> Key: FLINK-6998
> URL: https://issues.apache.org/jira/browse/FLINK-6998
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>
> Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in 
> KafkaConsumerThread class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

2017-06-27 Thread zhenzhongxu
Github user zhenzhongxu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4187#discussion_r124413756
  
--- Diff: docs/monitoring/metrics.md ---
@@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Connector:
+
+  
+
+  Scope
+  Metrics
+  Description
+
+  
+  
+
+  Slot/Consumer
+  kafkaCommitsSucceeded
+  Kafka offset commit success count if Kafka commit is turned 
on.
+
+
+   Slot/Consumer
+   kafkaCommitsFailed
+   Kafka offset commit failure count if Kafka commit is turned 
on.
--- End diff --

Sure, thanks for the feedback. Will update.a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065313#comment-16065313
 ] 

ASF GitHub Bot commented on FLINK-7009:
---

Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124366697
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -179,41 +254,130 @@ private String prefix(String ... names) {
}
}
 
-   private void send(final String name, final String value) {
+   private String buildStatsdLine(final String name, final String value, 
final String tags) {
+   Double number;
try {
-   String formatted = String.format("%s:%s|g", name, 
value);
-   byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
-   socket.send(new DatagramPacket(data, data.length, 
this.address));
+   number = Double.parseDouble(value);
+   }
+   catch (NumberFormatException e) {
+   // quietly skip values like "n/a"
+   return "";
}
-   catch (IOException e) {
-   LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
+   if (number >= 0.) {
+   return String.format("%s:%s|g%s", name, value, tags != 
null ? tags : "");
+   } else {
+   // quietly skip "unknowns" like 
lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN
+   return "";
}
}
 
-   @Override
-   public String filterCharacters(String input) {
+   private void send(final String name, final String value, final String 
tags) {
+   String formatted = buildStatsdLine(name, value, tags);
+   if (formatted.length() > 0) {
+   try {
+   byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
+   socket.send(new DatagramPacket(data, 
data.length, this.address));
+   }
+   catch (IOException e) {
+   LOG.error("unable to send packet to statsd at 
'{}:{}'", address.getHostName(), address.getPort());
+   }
+   }
+   }
+
+   /**
+   * dogstatsd names should: start with letter, uses ascii alphanumerics 
and underscore, separated by periods.
+   * Collapse runs of invalid characters into an underscore. Discard 
invalid prefix and suffix.
+   * Eg: ":::metric:::name:::" ->  "metric_name"
+   */
+
+   private boolean isValidStatsdChar(char c) {
+   return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c 
>= '0' && c <= '9') || (c == '_');
--- End diff --

Yeah that's a fair point.  The thought here is to take an extremely limited 
set to maximize compatibility with collectors and timeseries databases, to 
avoid translation.  I think if one were to make a compatibliity table you might 
find a few extra punctuation type characters, but I couldn't see how they 
changed the significance or meaning of the metrics, so landed on picking one 
commonly accepted non-period delimiter.  The underbar is also guidance from 
datadog, for best practice with their systems.  The metrics from Flink are 
super clean this way.  Flink metric names seem to always be alpha and dots, the 
tag names are all alpha and underscore (eg `task_id`), so looks quite natural 
for the tag values to be alphanumeric + underscore.

But yeah, is an arbitrary choice.


> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series d

[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter

2017-06-27 Thread dbrinegar
Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124366697
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -179,41 +254,130 @@ private String prefix(String ... names) {
}
}
 
-   private void send(final String name, final String value) {
+   private String buildStatsdLine(final String name, final String value, 
final String tags) {
+   Double number;
try {
-   String formatted = String.format("%s:%s|g", name, 
value);
-   byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
-   socket.send(new DatagramPacket(data, data.length, 
this.address));
+   number = Double.parseDouble(value);
+   }
+   catch (NumberFormatException e) {
+   // quietly skip values like "n/a"
+   return "";
}
-   catch (IOException e) {
-   LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
+   if (number >= 0.) {
+   return String.format("%s:%s|g%s", name, value, tags != 
null ? tags : "");
+   } else {
+   // quietly skip "unknowns" like 
lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN
+   return "";
}
}
 
-   @Override
-   public String filterCharacters(String input) {
+   private void send(final String name, final String value, final String 
tags) {
+   String formatted = buildStatsdLine(name, value, tags);
+   if (formatted.length() > 0) {
+   try {
+   byte[] data = 
formatted.getBytes(StandardCharsets.UTF_8);
+   socket.send(new DatagramPacket(data, 
data.length, this.address));
+   }
+   catch (IOException e) {
+   LOG.error("unable to send packet to statsd at 
'{}:{}'", address.getHostName(), address.getPort());
+   }
+   }
+   }
+
+   /**
+   * dogstatsd names should: start with letter, uses ascii alphanumerics 
and underscore, separated by periods.
+   * Collapse runs of invalid characters into an underscore. Discard 
invalid prefix and suffix.
+   * Eg: ":::metric:::name:::" ->  "metric_name"
+   */
+
+   private boolean isValidStatsdChar(char c) {
+   return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c 
>= '0' && c <= '9') || (c == '_');
--- End diff --

Yeah that's a fair point.  The thought here is to take an extremely limited 
set to maximize compatibility with collectors and timeseries databases, to 
avoid translation.  I think if one were to make a compatibliity table you might 
find a few extra punctuation type characters, but I couldn't see how they 
changed the significance or meaning of the metrics, so landed on picking one 
commonly accepted non-period delimiter.  The underbar is also guidance from 
datadog, for best practice with their systems.  The metrics from Flink are 
super clean this way.  Flink metric names seem to always be alpha and dots, the 
tag names are all alpha and underscore (eg `task_id`), so looks quite natural 
for the tag values to be alphanumeric + underscore.

But yeah, is an arbitrary choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065293#comment-16065293
 ] 

ASF GitHub Bot commented on FLINK-7009:
---

Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124364140
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -90,6 +109,45 @@ public void close() {
 
// 

 
+   /**
+* Removes leading and trailing angle brackets.
+*/
+   private String stripBrackets(String str) {
+   return str.substring(1, str.length() - 1);
+   }
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   if (dogstatsdMode) {
+   // memoize dogstatsd tag section: 
"|#tag:val,tag:val,tag:val"
+   StringBuilder statsdTagLine = new StringBuilder();
+   Map orderedTags = new 
TreeMap<>(group.getAllVariables());
+   for (Map.Entry entry: 
orderedTags.entrySet()) {
+   String k = stripBrackets(entry.getKey());
+   String v = filterCharacters(entry.getValue());
+   
statsdTagLine.append(",").append(k).append(":").append(v);
+   }
+   if (statsdTagLine.length() > 0) {
+   // remove first comma, prefix with "|#"
+   tagTable.put(metric, "|#" + 
statsdTagLine.substring(1));
--- End diff --

👍 


> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters pl

[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter

2017-06-27 Thread dbrinegar
Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124364140
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -90,6 +109,45 @@ public void close() {
 
// 

 
+   /**
+* Removes leading and trailing angle brackets.
+*/
+   private String stripBrackets(String str) {
+   return str.substring(1, str.length() - 1);
+   }
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   if (dogstatsdMode) {
+   // memoize dogstatsd tag section: 
"|#tag:val,tag:val,tag:val"
+   StringBuilder statsdTagLine = new StringBuilder();
+   Map orderedTags = new 
TreeMap<>(group.getAllVariables());
+   for (Map.Entry entry: 
orderedTags.entrySet()) {
+   String k = stripBrackets(entry.getKey());
+   String v = filterCharacters(entry.getValue());
+   
statsdTagLine.append(",").append(k).append(":").append(v);
+   }
+   if (statsdTagLine.length() > 0) {
+   // remove first comma, prefix with "|#"
+   tagTable.put(metric, "|#" + 
statsdTagLine.substring(1));
--- End diff --

👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065289#comment-16065289
 ] 

ASF GitHub Bot commented on FLINK-7009:
---

Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124363952
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -90,6 +109,45 @@ public void close() {
 
// 

 
+   /**
+* Removes leading and trailing angle brackets.
+*/
+   private String stripBrackets(String str) {
+   return str.substring(1, str.length() - 1);
+   }
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   if (dogstatsdMode) {
+   // memoize dogstatsd tag section: 
"|#tag:val,tag:val,tag:val"
+   StringBuilder statsdTagLine = new StringBuilder();
+   Map orderedTags = new 
TreeMap<>(group.getAllVariables());
--- End diff --

sorry!  this was the beginning of looking at a more efficient tag table, as 
many entries are duplicates, but I'll take out since we're going to put the tag 
line in the metric object


> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the 
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point 
> would be to preserve the metrics so they are valid, avoid truncation, and can 
> be aggregated along other dimensions even if this particular dimension is 
> hard to parse after the compression.



--
This message was sent by A

[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter

2017-06-27 Thread dbrinegar
Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124363952
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ---
@@ -90,6 +109,45 @@ public void close() {
 
// 

 
+   /**
+* Removes leading and trailing angle brackets.
+*/
+   private String stripBrackets(String str) {
+   return str.substring(1, str.length() - 1);
+   }
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   if (dogstatsdMode) {
+   // memoize dogstatsd tag section: 
"|#tag:val,tag:val,tag:val"
+   StringBuilder statsdTagLine = new StringBuilder();
+   Map orderedTags = new 
TreeMap<>(group.getAllVariables());
--- End diff --

sorry!  this was the beginning of looking at a more efficient tag table, as 
many entries are duplicates, but I'll take out since we're going to put the tag 
line in the metric object


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065283#comment-16065283
 ] 

ASF GitHub Bot commented on FLINK-7009:
---

Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124363455
  
--- Diff: docs/monitoring/metrics.md ---
@@ -420,10 +420,30 @@ metrics.reporter.grph.protocol: TCP
 In order to use this reporter you must copy 
`/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder
 of your Flink distribution.
 
+In `dogstatsd` mode, all variables in Flink metrics such as ``, 
``, ``, ``, ``,
+`` and others, will be included as tags.  It is recommended 
to define scopes for this reporter such that no
--- End diff --

good call!  I'll make these and other suggested changes, hopefully this week


> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the 
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point 
> would be to preserve the metrics so they are valid, avoid truncation, and can 
> be aggregated along other dimensions even if this particular dimension is 
> hard to parse after the compression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter

2017-06-27 Thread dbrinegar
Github user dbrinegar commented on a diff in the pull request:

https://github.com/apache/flink/pull/4188#discussion_r124363455
  
--- Diff: docs/monitoring/metrics.md ---
@@ -420,10 +420,30 @@ metrics.reporter.grph.protocol: TCP
 In order to use this reporter you must copy 
`/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder
 of your Flink distribution.
 
+In `dogstatsd` mode, all variables in Flink metrics such as ``, 
``, ``, ``, ``,
+`` and others, will be included as tags.  It is recommended 
to define scopes for this reporter such that no
--- End diff --

good call!  I'll make these and other suggested changes, hopefully this week


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2017-06-27 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065195#comment-16065195
 ] 

sunjincheng commented on FLINK-6895:


Hi [~Aegeaner] thanks.

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065187#comment-16065187
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124341520
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 ---
@@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+val window = agg.getWindow
 
-// Retrieve window start and end properties
+val isRowtime = isRowtimeAttribute(window.timeAttribute)
+val isProctime = isProctimeAttribute(window.timeAttribute)
+
+val startEndProperties = Seq(
+  NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
+  NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
+
+// allow rowtime/proctime for rowtime windows and proctime for 
proctime windows
+val timeProperties = if (isRowtime) {
+  Seq(
+NamedWindowProperty("w$rowtime", 
RowtimeAttribute(window.aliasAttribute)),
+NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
--- End diff --

Why rowtime windows need the proctime property?


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065189#comment-16065189
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124336278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
 ---
@@ -28,6 +28,7 @@ import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.api._
--- End diff --

Please remove useless impor at line 26 `import 
org.apache.calcite.sql.fun.SqlStdOperatorTable`.


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065190#comment-16065190
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124344855
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 ---
@@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+val window = agg.getWindow
 
-// Retrieve window start and end properties
+val isRowtime = isRowtimeAttribute(window.timeAttribute)
+val isProctime = isProctimeAttribute(window.timeAttribute)
+
+val startEndProperties = Seq(
+  NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
+  NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
+
+// allow rowtime/proctime for rowtime windows and proctime for 
proctime windows
+val timeProperties = if (isRowtime) {
+  Seq(
+NamedWindowProperty("w$rowtime", 
RowtimeAttribute(window.aliasAttribute)),
+NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
+} else if (isProctime) {
+  Seq(NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
+} else {
+  Seq()
+}
+
+val properties = startEndProperties ++ timeProperties
+
+// retrieve window start and end properties
 val transformed = call.builder()
 val rexBuilder = transformed.getRexBuilder
 transformed.push(LogicalWindowAggregate.create(
-  agg.getWindow,
-  Seq(
-NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
-NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))
-  ), agg)
+  window,
+  properties,
+  agg)
 )
 
 // forward window start and end properties
 transformed.project(
-  innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
+  innerProject.getProjects ++ properties.map(np => 
transformed.field(np.name)))
 
 def replaceGroupAuxiliaries(node: RexNode): RexNode = {
   node match {
 case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
   // replace expression by access to window start
   rexBuilder.makeCast(c.getType, transformed.field("w$start"), 
false)
+
 case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
   // replace expression by access to window end
   rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+
+case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) 
=>
+  if (isProctime) {
+throw ValidationException("A proctime window cannot provide a 
rowtime attribute.")
+  } else if (isRowtime) {
+// replace expression by access to window rowtime
+transformed.field("w$rowtime")
+  } else {
+throw TableException("Accessing the rowtime attribute of a 
window is not yet " +
+  "supported in a batch environment.")
+  }
+
+case c: RexCall if 
WindowStartEndPropertiesRule.isWindowProctime(c) =>
+  if (isProctime) {
+// replace expression by access to window proctime
+transformed.field("w$proctime")
+  } else {
+throw ValidationException("Proctime is not supported in a 
batch environment.")
+  }
--- End diff --

We can throw this exception in a Stream rowtime window if we want query 
`TUMBLE_PROCTIME`, So I thinks this message should be improve or add a 
`isRowtime` process.


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)

[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065186#comment-16065186
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124335359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/MathFunctions.scala
 ---
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.functions.utils
+package org.apache.flink.table.runtime.functions
--- End diff --

I suggest `MathFunctions`->`ScalarFunctions`. so that we can add all scalar 
functions in one file. and in the `...runtime.functions` package we have three 
file `ScalarFunctions`,`TableFunctions` and `AggFunctions`.Which in FLINK-6810 
plan to do. What do you think?


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065188#comment-16065188
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124329598
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
 ---
@@ -1900,7 +1900,7 @@ public void unparse(
new SqlGroupFunction(SqlKind.TUMBLE, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
OperandTypes.DATETIME_INTERVAL_TIME)) {
-   @Override List 
getAuxiliaryFunctions() {
+   @Override public List 
getAuxiliaryFunctions() {
--- End diff --

Dose this method only using in `SqlStdOperatorTable.java`? if so , why we 
add `public`. I find calcite have not add this modifier.


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065191#comment-16065191
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124335785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ---
@@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: 
RexBuilder) extends RelShuttle {
 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
   updatedCall
 
+case BasicOperatorTable.TUMBLE_ROWTIME |
+BasicOperatorTable.TUMBLE_PROCTIME |
+BasicOperatorTable.HOP_ROWTIME |
+BasicOperatorTable.HOP_PROCTIME |
+BasicOperatorTable.SESSION_ROWTIME |
+BasicOperatorTable.SESSION_PROCTIME if 
isTimeIndicatorType(updatedCall.getType) =>
+  updatedCall
--- End diff --

Can we remove the condition of `if isTimeIndicatorType(updatedCall.getType) 
`?


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124329598
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
 ---
@@ -1900,7 +1900,7 @@ public void unparse(
new SqlGroupFunction(SqlKind.TUMBLE, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
OperandTypes.DATETIME_INTERVAL_TIME)) {
-   @Override List 
getAuxiliaryFunctions() {
+   @Override public List 
getAuxiliaryFunctions() {
--- End diff --

Dose this method only using in `SqlStdOperatorTable.java`? if so , why we 
add `public`. I find calcite have not add this modifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124335785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ---
@@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: 
RexBuilder) extends RelShuttle {
 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
   updatedCall
 
+case BasicOperatorTable.TUMBLE_ROWTIME |
+BasicOperatorTable.TUMBLE_PROCTIME |
+BasicOperatorTable.HOP_ROWTIME |
+BasicOperatorTable.HOP_PROCTIME |
+BasicOperatorTable.SESSION_ROWTIME |
+BasicOperatorTable.SESSION_PROCTIME if 
isTimeIndicatorType(updatedCall.getType) =>
+  updatedCall
--- End diff --

Can we remove the condition of `if isTimeIndicatorType(updatedCall.getType) 
`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124341520
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 ---
@@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+val window = agg.getWindow
 
-// Retrieve window start and end properties
+val isRowtime = isRowtimeAttribute(window.timeAttribute)
+val isProctime = isProctimeAttribute(window.timeAttribute)
+
+val startEndProperties = Seq(
+  NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
+  NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
+
+// allow rowtime/proctime for rowtime windows and proctime for 
proctime windows
+val timeProperties = if (isRowtime) {
+  Seq(
+NamedWindowProperty("w$rowtime", 
RowtimeAttribute(window.aliasAttribute)),
+NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
--- End diff --

Why rowtime windows need the proctime property?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124344855
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 ---
@@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+val window = agg.getWindow
 
-// Retrieve window start and end properties
+val isRowtime = isRowtimeAttribute(window.timeAttribute)
+val isProctime = isProctimeAttribute(window.timeAttribute)
+
+val startEndProperties = Seq(
+  NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
+  NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
+
+// allow rowtime/proctime for rowtime windows and proctime for 
proctime windows
+val timeProperties = if (isRowtime) {
+  Seq(
+NamedWindowProperty("w$rowtime", 
RowtimeAttribute(window.aliasAttribute)),
+NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
+} else if (isProctime) {
+  Seq(NamedWindowProperty("w$proctime", 
ProctimeAttribute(window.aliasAttribute)))
+} else {
+  Seq()
+}
+
+val properties = startEndProperties ++ timeProperties
+
+// retrieve window start and end properties
 val transformed = call.builder()
 val rexBuilder = transformed.getRexBuilder
 transformed.push(LogicalWindowAggregate.create(
-  agg.getWindow,
-  Seq(
-NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
-NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))
-  ), agg)
+  window,
+  properties,
+  agg)
 )
 
 // forward window start and end properties
 transformed.project(
-  innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
+  innerProject.getProjects ++ properties.map(np => 
transformed.field(np.name)))
 
 def replaceGroupAuxiliaries(node: RexNode): RexNode = {
   node match {
 case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
   // replace expression by access to window start
   rexBuilder.makeCast(c.getType, transformed.field("w$start"), 
false)
+
 case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
   // replace expression by access to window end
   rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+
+case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) 
=>
+  if (isProctime) {
+throw ValidationException("A proctime window cannot provide a 
rowtime attribute.")
+  } else if (isRowtime) {
+// replace expression by access to window rowtime
+transformed.field("w$rowtime")
+  } else {
+throw TableException("Accessing the rowtime attribute of a 
window is not yet " +
+  "supported in a batch environment.")
+  }
+
+case c: RexCall if 
WindowStartEndPropertiesRule.isWindowProctime(c) =>
+  if (isProctime) {
+// replace expression by access to window proctime
+transformed.field("w$proctime")
+  } else {
+throw ValidationException("Proctime is not supported in a 
batch environment.")
+  }
--- End diff --

We can throw this exception in a Stream rowtime window if we want query 
`TUMBLE_PROCTIME`, So I thinks this message should be improve or add a 
`isRowtime` process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124336278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
 ---
@@ -28,6 +28,7 @@ import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.api._
--- End diff --

Please remove useless impor at line 26 `import 
org.apache.calcite.sql.fun.SqlStdOperatorTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-06-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4199#discussion_r124335359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/MathFunctions.scala
 ---
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.functions.utils
+package org.apache.flink.table.runtime.functions
--- End diff --

I suggest `MathFunctions`->`ScalarFunctions`. so that we can add all scalar 
functions in one file. and in the `...runtime.functions` package we have three 
file `ScalarFunctions`,`TableFunctions` and `AggFunctions`.Which in FLINK-6810 
plan to do. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7006) Base class using POJOs for Gelly algorithms

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065146#comment-16065146
 ] 

ASF GitHub Bot commented on FLINK-7006:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/4201

[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms

Gelly algorithms commonly have a Result class extending a Tuple type and 
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface and 
convert the Result classes to POJOs extending the base result classes.

Note: The `TriangleListing` hashes changed because previously these 
algorithms did not have a `Result` class and simply used the `Tuple` `hashCode`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
7006_base_class_using_pojos_for_gelly_algorithms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4201


commit 15ac62ba7d0f7982961aa4b631870a641510c707
Author: Greg Hogan 
Date:   2017-06-26T14:21:50Z

[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms

Gelly algorithms commonly have a Result class extending a Tuple type and
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface
and convert the Result classes to POJOs extending the base result
classes.




> Base class using POJOs for Gelly algorithms
> ---
>
> Key: FLINK-7006
> URL: https://issues.apache.org/jira/browse/FLINK-7006
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.4.0
>
>
> Gelly algorithms commonly have a {{Result}} class extending a {{Tuple}} type 
> and implementing one of the {{Unary/Binary/TertiaryResult}} interfaces.
> Add a {{Unary/Binary/TertiaryResultBase}} class implementing each interface 
> and convert the {{Result}} classes to POJOs extending the base result classes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4201: [FLINK-7006] [gelly] Base class using POJOs for Ge...

2017-06-27 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/4201

[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms

Gelly algorithms commonly have a Result class extending a Tuple type and 
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface and 
convert the Result classes to POJOs extending the base result classes.

Note: The `TriangleListing` hashes changed because previously these 
algorithms did not have a `Result` class and simply used the `Tuple` `hashCode`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
7006_base_class_using_pojos_for_gelly_algorithms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4201


commit 15ac62ba7d0f7982961aa4b631870a641510c707
Author: Greg Hogan 
Date:   2017-06-26T14:21:50Z

[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms

Gelly algorithms commonly have a Result class extending a Tuple type and
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface
and convert the Result classes to POJOs extending the base result
classes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7007) Add README to "flink-shaded.git" repository

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065138#comment-16065138
 ] 

ASF GitHub Bot commented on FLINK-7007:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink-shaded/pull/3#discussion_r124335176
  
--- Diff: README.md ---
@@ -0,0 +1,28 @@
+
+
+# Apache Flink Shaded Dependencies
+
+This repository contains a number of shaded dependencies for the [Apache 
Flink](https://flink.apache.org/) project.
+
+The purpose of these dependencies is to provide a single instance of a 
shaded dependency in the Flink distribution, instead of each individual module 
shading the dependency.
+
+The shaded dependencies contained here do not expose any transitive 
dependencies. They may or may not be self-contained.
+
+When using these dependencies it is recommended to work directly against 
the shaded namespaces.
--- End diff --

Could you add the following
```
About

Apache Flink is an open source project of The Apache Software Foundation 
(ASF).
``` 
and link to the ASF?

Other than that +1 to merge


> Add README to "flink-shaded.git" repository
> ---
>
> Key: FLINK-7007
> URL: https://issues.apache.org/jira/browse/FLINK-7007
> Project: Flink
>  Issue Type: Sub-task
>  Components: flink-shaded.git
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> We should put a README file up there with a brief explanation of the purpose 
> of the repo + some links to the project.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-7014:
-

Assignee: Ruidong Li

> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7016) Move inputFormat to InputFormatVertex from TaskConfig

2017-06-27 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7016:
--
Component/s: DataStream API
 DataSet API

> Move inputFormat to InputFormatVertex from TaskConfig
> -
>
> Key: FLINK-7016
> URL: https://issues.apache.org/jira/browse/FLINK-7016
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> On batch case, InputFormat is put into TaskConfig, batch task gets it to read 
> data and job manager uses it to split splits from TaskConfig.
> On streaming case, all configs are put into StreamConfig, but this 
> inputFormat is put into TaskConfig.
> We can put InputFormat into InputFormatVertex, and batch task still gets 
> InputFormat from TaskConfig. It will be clear.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7015) Separate OperatorConfig from StreamConfig

2017-06-27 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7015:
--
Component/s: DataStream API

> Separate OperatorConfig from StreamConfig
> -
>
> Key: FLINK-7015
> URL: https://issues.apache.org/jira/browse/FLINK-7015
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> Now stream config contains  configs not only the batch task needs, but also 
> the operator needs, so stream config can see configs of the operator, and 
> operator can see configs of the batch task.
> We  need to separate operator config from stream config, and they can only 
> see configs of themselves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7018) Reconstruct streamgraph to clear interface

2017-06-27 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7018:
--
Component/s: DataStream API

>  Reconstruct streamgraph to clear interface
> ---
>
> Key: FLINK-7018
> URL: https://issues.apache.org/jira/browse/FLINK-7018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> StreamGraph not only contains streamNodes and streamEdges, but also contains 
> virtual nodes who has nothing to do with the streamGraph. 
> Virtual nodes should be converted to streamNodes in   
> StreamGraphGenerator.class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6866) ClosureCleaner.clean fails for scala's JavaConverters wrapper classes

2017-06-27 Thread SmedbergM (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065091#comment-16065091
 ] 

SmedbergM commented on FLINK-6866:
--

A Map[K,V] is already Serializable, as is java.util.Map -- it's just that 
before 2.12 scala.collection.convert.Wrappers.MapWrapper didn't inherit from 
the Serializable interface. I'm not sure there's a non-hacky (i.e. explicitly 
checking whether the classname begins with scala.collections.convert) way to 
get around this, because ObjectOutputStream checks whether each field inherits 
from Serializable. Unfortunately, the scala library writers didn't give the 
MapWrapper public access to its underlying wrapped Map.

I don't think the details of the particular code that we encountered this in is 
all that relevant; in Scala, it's idiomatic to create immutable Maps and call 
`.asJava` on them when a library (like Flink) requires a java.util.Map. Of 
course, one can avoid having the wrapper by directly constructing the 
java.util.Map as in the MWE, but then one has all the mutability and 
concurrency worries that good idiomatic Scala lets me not think about.

> ClosureCleaner.clean fails for scala's JavaConverters wrapper classes
> -
>
> Key: FLINK-6866
> URL: https://issues.apache.org/jira/browse/FLINK-6866
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Scala API
>Affects Versions: 1.2.0, 1.3.0
> Environment: Scala 2.10.6, Scala 2.11.11
> Does not appear using Scala 2.12
>Reporter: SmedbergM
>
> MWE: https://github.com/SmedbergM/ClosureCleanerBug
> MWE console output: 
> https://gist.github.com/SmedbergM/ce969e6e8540da5b59c7dd921a496dc5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2017-06-27 Thread Aegeaner (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065086#comment-16065086
 ] 

Aegeaner commented on FLINK-6895:
-

I will try this.

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6895) Add STR_TO_DATE supported in SQL

2017-06-27 Thread Aegeaner (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aegeaner reassigned FLINK-6895:
---

Assignee: Aegeaner

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065057#comment-16065057
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4183#discussion_r124298519
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -37,6 +37,14 @@ class BatchQueryConfig private[table] extends QueryConfig
 class StreamQueryConfig private[table] extends QueryConfig {
 
   /**
+* The deferredComputationTime is a strategy config of deferred 
computation that used to deal
+* with late arriving data. For example, instead of computing a 
tumbling window of 1 hour at each
+* full hour, we can add a deferred computation interval of 15 minute 
to compute the result
+* quarter past each full hour.
+*/
+  private var deferredComputationTime: Long = 0L
--- End diff --

Should we call this parameter rather `firstResultTimeOffset`? This would 
allow us to also use it to configure early results later. The value would be an 
offset from the original computation time. A positive value (> 0) would mean 
deferred computation (later than usual) and a negative value (<0) would mean an 
early computation / early result.


> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065056#comment-16065056
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4183#discussion_r124319583
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -92,6 +100,27 @@ class StreamQueryConfig private[table] extends 
QueryConfig {
 this
   }
 
+  /**
+* Specifies a deferred computation time for deferred computation, 
i.e., fires the
--- End diff --

Specifies an offset for the point in time when the first result of a 
time-based computation is computed. For example, a tumbling window of one hour 
that ends at 13:00 would usually compute its first result at 13:00. With a 
firstResultTimeOffset of 15 minutes, the first result would be computed at 
13:15.

A positive firstResultTimeOffset parameter can be used to include late 
arriving records into the result of an event-time based computation. Negative 
offset values are not supported yet.

Later, a negative offset will allow to compute early results, i.e., an 
offset of -45 minutes would compute the first and early result of the hourly 
tumbling window that ends at 13:00 at 12:15.



> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065058#comment-16065058
 ] 

ASF GitHub Bot commented on FLINK-6969:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4183#discussion_r124298671
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -92,6 +100,27 @@ class StreamQueryConfig private[table] extends 
QueryConfig {
 this
   }
 
+  /**
+* Specifies a deferred computation time for deferred computation, 
i.e., fires the
+* [[org.apache.flink.streaming.api.windowing.windows.TimeWindow]] on 
the time which not
+* smaller than (window.maxTimestamp + deferredComputationTime). For 
example, instead of
+* computing a tumbling window of 1 hour at each full hour, we can add 
a deferred computation
+* interval of 15 minute to compute the result quarter past each full 
hour.
+*/
+  def withDeferredComputationTime(deferredComputationTime: Time): 
StreamQueryConfig = {
--- End diff --

rename to `withFirstResultTimeOffset`?


> Add support for deferred computation for group window aggregates
> 
>
> Key: FLINK-6969
> URL: https://issues.apache.org/jira/browse/FLINK-6969
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Deferred computation is a strategy to deal with late arriving data and avoid 
> updates of previous results. Instead of computing a result as soon as it is 
> possible (i.e., when a corresponding watermark was received), deferred 
> computation adds a configurable amount of slack time in which late data is 
> accepted before the result is compute. For example, instead of computing a 
> tumbling window of 1 hour at each full hour, we can add a deferred 
> computation interval of 15 minute to compute the result quarter past each 
> full hour.
> This approach adds latency but can reduce the number of update esp. in use 
> cases where the user cannot influence the generation of watermarks. It is 
> also useful if the data is emitted to a system that cannot update result 
> (files or Kafka). The deferred computation interval should be configured via 
> the {{QueryConfig}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >