[jira] [Commented] (FLINK-7017) Remove netty usages in flink-tests
[ https://issues.apache.org/jira/browse/FLINK-7017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066016#comment-16066016 ] ASF GitHub Bot commented on FLINK-7017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4196#discussion_r124464897 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java --- @@ -83,16 +78,16 @@ public static void initialize() throws Exception { assertTrue("Unable to create temp directory", logDir.mkdir()); File logFile = new File(logDir, "jobmanager.log"); File outFile = new File(logDir, "jobmanager.out"); - + Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); - + --- End diff -- this is the removal of leading tabs that is required by the checkstyle active in `flink-runtime-web`. > Remove netty usages in flink-tests > -- > > Key: FLINK-7017 > URL: https://issues.apache.org/jira/browse/FLINK-7017 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4196: [FLINK-7017] Remove netty usages in flink-tests
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4196#discussion_r124464897 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java --- @@ -83,16 +78,16 @@ public static void initialize() throws Exception { assertTrue("Unable to create temp directory", logDir.mkdir()); File logFile = new File(logDir, "jobmanager.log"); File outFile = new File(logDir, "jobmanager.out"); - + Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); - + --- End diff -- this is the removal of leading tabs that is required by the checkstyle active in `flink-runtime-web`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6665) Pass a ScheduledExecutorService to the RestartStrategy
[ https://issues.apache.org/jira/browse/FLINK-6665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-6665: Assignee: Fang Yong > Pass a ScheduledExecutorService to the RestartStrategy > -- > > Key: FLINK-6665 > URL: https://issues.apache.org/jira/browse/FLINK-6665 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently, the {{RestartStrategy}} is called when the {{ExecutionGraph}} > should be restarted. > To facilitate delays before restarting, the strategy simply sleeps, blocking > the thread that runs the ExecutionGraph's recovery method. > I suggest to pass {{ScheduledExecutorService}}) to the {{RestartStrategy}} > and let it schedule the restart call that way, avoiding any sleeps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over
sunjincheng created FLINK-7025: -- Summary: Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over Key: FLINK-7025 URL: https://issues.apache.org/jira/browse/FLINK-7025 Project: Flink Issue Type: Bug Reporter: sunjincheng Assignee: sunjincheng Currently we added `Cleanup State` feature. But It not work well if we enabled the stateCleaning on Unbounded ProcTime NonPartitioned Over window, Because in `ProcessFunctionWithCleanupState` we has using the keyed state. So, In this JIRA. I'll change the `Unbounded ProcTime NonPartitioned Over` to `partitioned Over` by using NullByteKeySelector. OR created a `NonKeyedProcessFunctionWithCleanupState`. But I think the first way is simpler. What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065960#comment-16065960 ] ASF GitHub Bot commented on FLINK-7014: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124457316 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test(expected = classOf[NullPointerException]) + def testReduceDeterministicUDF(): Unit = { --- End diff -- The exception isn't caused by Flink, to check an unexpected exception is not a good way. I suggest to mark this test as `@Ignore` and add a TODO comment on the top of the test. After CALCITE-1860 is fixed, we can reopen this test or check&remove this test. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065961#comment-16065961 ] ASF GitHub Bot commented on FLINK-7014: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124457542 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test(expected = classOf[NullPointerException]) + def testReduceDeterministicUDF(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + +// if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860] +val result = table + .select('a, 'b, 'c, DeterministicNullFunc() as 'd) + .where("d.isNull") + .select('a, 'b, 'c) + +val expected: String = "" --- End diff -- We should set the expected value here to make sure the test can pass once we reopen it. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124457542 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test(expected = classOf[NullPointerException]) + def testReduceDeterministicUDF(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + +// if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860] +val result = table + .select('a, 'b, 'c, DeterministicNullFunc() as 'd) + .where("d.isNull") + .select('a, 'b, 'c) + +val expected: String = "" --- End diff -- We should set the expected value here to make sure the test can pass once we reopen it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124457316 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -422,4 +423,50 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test(expected = classOf[NullPointerException]) + def testReduceDeterministicUDF(): Unit = { --- End diff -- The exception isn't caused by Flink, to check an unexpected exception is not a good way. I suggest to mark this test as `@Ignore` and add a TODO comment on the top of the test. After CALCITE-1860 is fixed, we can reopen this test or check&remove this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065956#comment-16065956 ] ASF GitHub Bot commented on FLINK-6925: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124456032 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer --- End diff -- Please use `StringBuilder` instead of `StringBuffer`. `StringBuffer` is a thread-safe with poorer performance. From StringBuffer JavaDoc: >As of release JDK 5, this class has been supplemented with an equivalent class designed for use by a single thread, StringBuilder. The StringBuilder class should generally be used in preference to this one, as it supports all of the same operations but it is faster, as it performs no synchronization. > Add CONCAT/CONCAT_WS supported in SQL > - > > Key: FLINK-6925 > URL: https://issues.apache.org/jira/browse/FLINK-6925 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > CONCAT(str1,str2,...)Returns the string that results from concatenating the > arguments. May have one or more arguments. If all arguments are nonbinary > strings, the result is a nonbinary string. If the arguments include any > binary strings, the result is a binary string. A numeric argument is > converted to its equivalent nonbinary string form. > CONCAT() returns NULL if any argument is NULL. > * Syntax: > CONCAT(str1,str2,...) > * Arguments > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT('F', 'lin', 'k') -> 'Flink' > CONCAT('M', NULL, 'L') -> NULL > CONCAT(14.3) -> '14.3' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat] > CONCAT_WS() stands for Concatenate With Separator and is a special form of > CONCAT(). The first argument is the separator for the rest of the arguments. > The separator is added between the strings to be concatenated. The separator > can be a string, as can the rest of the arguments. If the separator is NULL, > the result is NULL. > * Syntax: > CONCAT_WS(separator,str1,str2,...) > * Arguments > ** separator - > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second > name,Last Name' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124452756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { --- End diff -- I would suggest to change the signature to `sep: String, strs: String*` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r123681972 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/scalarFunctions/ScalarFunctions.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.scalarFunctions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * All build-in scalar scalar functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { --- End diff -- I would like to declare the signature like this `(sep: String, str1: String, others: String*)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065954#comment-16065954 ] ASF GitHub Bot commented on FLINK-6925: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124456117 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { +val separator = args(0) +if (null == separator) { + return null +} + +val sb = new JStringBuffer --- End diff -- Please use `StringBuilder` instead of `StringBuffer`. > Add CONCAT/CONCAT_WS supported in SQL > - > > Key: FLINK-6925 > URL: https://issues.apache.org/jira/browse/FLINK-6925 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > CONCAT(str1,str2,...)Returns the string that results from concatenating the > arguments. May have one or more arguments. If all arguments are nonbinary > strings, the result is a nonbinary string. If the arguments include any > binary strings, the result is a binary string. A numeric argument is > converted to its equivalent nonbinary string form. > CONCAT() returns NULL if any argument is NULL. > * Syntax: > CONCAT(str1,str2,...) > * Arguments > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT('F', 'lin', 'k') -> 'Flink' > CONCAT('M', NULL, 'L') -> NULL > CONCAT(14.3) -> '14.3' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat] > CONCAT_WS() stands for Concatenate With Separator and is a special form of > CONCAT(). The first argument is the separator for the rest of the arguments. > The separator is added between the strings to be concatenated. The separator > can be a string, as can the rest of the arguments. If the separator is NULL, > the result is NULL. > * Syntax: > CONCAT_WS(separator,str1,str2,...) > * Arguments > ** separator - > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second > name,Last Name' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065957#comment-16065957 ] ASF GitHub Bot commented on FLINK-6925: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r123681972 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/scalarFunctions/ScalarFunctions.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.scalarFunctions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * All build-in scalar scalar functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { --- End diff -- I would like to declare the signature like this `(sep: String, str1: String, others: String*)` > Add CONCAT/CONCAT_WS supported in SQL > - > > Key: FLINK-6925 > URL: https://issues.apache.org/jira/browse/FLINK-6925 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > CONCAT(str1,str2,...)Returns the string that results from concatenating the > arguments. May have one or more arguments. If all arguments are nonbinary > strings, the result is a nonbinary string. If the arguments include any > binary strings, the result is a binary string. A numeric argument is > converted to its equivalent nonbinary string form. > CONCAT() returns NULL if any argument is NULL. > * Syntax: > CONCAT(str1,str2,...) > * Arguments > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT('F', 'lin', 'k') -> 'Flink' > CONCAT('M', NULL, 'L') -> NULL > CONCAT(14.3) -> '14.3' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat] > CONCAT_WS() stands for Concatenate With Separator and is a special form of > CONCAT(). The first argument is the separator for the rest of the arguments. > The separator is added between the strings to be concatenated. The separator > can be a string, as can the rest of the arguments. If the separator is NULL, > the result is NULL. > * Syntax: > CONCAT_WS(separator,str1,str2,...) > * Arguments > ** separator - > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second > name,Last Name' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065955#comment-16065955 ] ASF GitHub Bot commented on FLINK-6925: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124452756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { --- End diff -- I would suggest to change the signature to `sep: String, strs: String*` > Add CONCAT/CONCAT_WS supported in SQL > - > > Key: FLINK-6925 > URL: https://issues.apache.org/jira/browse/FLINK-6925 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > CONCAT(str1,str2,...)Returns the string that results from concatenating the > arguments. May have one or more arguments. If all arguments are nonbinary > strings, the result is a nonbinary string. If the arguments include any > binary strings, the result is a binary string. A numeric argument is > converted to its equivalent nonbinary string form. > CONCAT() returns NULL if any argument is NULL. > * Syntax: > CONCAT(str1,str2,...) > * Arguments > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT('F', 'lin', 'k') -> 'Flink' > CONCAT('M', NULL, 'L') -> NULL > CONCAT(14.3) -> '14.3' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat] > CONCAT_WS() stands for Concatenate With Separator and is a special form of > CONCAT(). The first argument is the separator for the rest of the arguments. > The separator is added between the strings to be concatenated. The separator > can be a string, as can the rest of the arguments. If the separator is NULL, > the result is NULL. > * Syntax: > CONCAT_WS(separator,str1,str2,...) > * Arguments > ** separator - > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second > name,Last Name' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124456117 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer +var i = 0 +while (i < args.length) { + if (args(i) == null) { +return null + } + sb.append(args(i)) + i += 1 +} +sb.toString + } + + /** +* Returns the string that results from concatenating the arguments and separator. +* Returns NULL If the separator is NULL. +* +* Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after +* the separator argument. +* +* @param args The first element of argument is the separator for the rest of the arguments. +*/ + @varargs + def concat_ws(args: String*): String = { +val separator = args(0) +if (null == separator) { + return null +} + +val sb = new JStringBuffer --- End diff -- Please use `StringBuilder` instead of `StringBuffer`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4138: [FLINK-6925][table]Add CONCAT/CONCAT_WS supported ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4138#discussion_r124456032 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.functions + +import scala.annotation.varargs +import java.math.{BigDecimal => JBigDecimal} +import java.lang.{StringBuffer => JStringBuffer} + +/** + * Built-in scalar runtime functions. + */ +class ScalarFunctions {} + +object ScalarFunctions { + + def power(a: Double, b: JBigDecimal): Double = { +Math.pow(a, b.doubleValue()) + } + + /** +* Returns the string that results from concatenating the arguments. +* Returns NULL if any argument is NULL. +*/ + @varargs + def concat(args: String*): String = { +val sb = new JStringBuffer --- End diff -- Please use `StringBuilder` instead of `StringBuffer`. `StringBuffer` is a thread-safe with poorer performance. From StringBuffer JavaDoc: >As of release JDK 5, this class has been supplemented with an equivalent class designed for use by a single thread, StringBuilder. The StringBuilder class should generally be used in preference to this one, as it supports all of the same operations but it is faster, as it performs no synchronization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7008) Update NFA state only when the NFA changes.
[ https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065948#comment-16065948 ] ASF GitHub Bot commented on FLINK-7008: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4195#discussion_r124455303 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java --- @@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testNFAChange() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("not").where(new IterativeCondition() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("e"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + nfa.process(new Event(1, "b", 1.0), 1L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; --- End diff -- @dawidwys thanks a lot for the review. Have added more comments. > Update NFA state only when the NFA changes. > --- > > Key: FLINK-7008 > URL: https://issues.apache.org/jira/browse/FLINK-7008 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.1 >Reporter: Kostas Kloudas >Assignee: Dian Fu > Fix For: 1.4.0 > > > Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we > update the NFA state every time the NFA is touched. This leads to redundant > puts/gets to the state when there are no changes to the NFA itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4195: [FLINK-7008] [cep] Update NFA state only when the ...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4195#discussion_r124455303 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java --- @@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testNFAChange() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("not").where(new IterativeCondition() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("e"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + nfa.process(new Event(1, "b", 1.0), 1L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; --- End diff -- @dawidwys thanks a lot for the review. Have added more comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x
[ https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065941#comment-16065941 ] ASF GitHub Bot commented on FLINK-6407: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4205 cc @zentol > Upgrade AVRO dependency version to 1.8.x > > > Key: FLINK-6407 > URL: https://issues.apache.org/jira/browse/FLINK-6407 > Project: Flink > Issue Type: Wish > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.1 >Reporter: Miguel >Assignee: mingleizhang >Priority: Minor > > Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is > limited to String type keys). It was solved in Avro 1.8.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4205 cc @zentol --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x
[ https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065937#comment-16065937 ] ASF GitHub Bot commented on FLINK-6407: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4205 [FLINK-6407] [build] Upgrade AVRO to 1.8.2 Upgrade to the last maintenance releases of **AVRO** 1.8.2 You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6407 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4205 commit b9790a643256bc95ba9612ed65f948719fbb08e2 Author: zhangminglei Date: 2017-06-28T03:41:52Z [FLINK-6407] [build] Upgrade AVRO to 1.8.2 > Upgrade AVRO dependency version to 1.8.x > > > Key: FLINK-6407 > URL: https://issues.apache.org/jira/browse/FLINK-6407 > Project: Flink > Issue Type: Wish > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.1 >Reporter: Miguel >Assignee: mingleizhang >Priority: Minor > > Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is > limited to String type keys). It was solved in Avro 1.8.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4205: [FLINK-6407] [build] Upgrade AVRO to 1.8.2
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4205 [FLINK-6407] [build] Upgrade AVRO to 1.8.2 Upgrade to the last maintenance releases of **AVRO** 1.8.2 You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6407 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4205 commit b9790a643256bc95ba9612ed65f948719fbb08e2 Author: zhangminglei Date: 2017-06-28T03:41:52Z [FLINK-6407] [build] Upgrade AVRO to 1.8.2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7024) Add supported for selecting window proctime/rowtime on row-based Tumble/Slide window
sunjincheng created FLINK-7024: -- Summary: Add supported for selecting window proctime/rowtime on row-based Tumble/Slide window Key: FLINK-7024 URL: https://issues.apache.org/jira/browse/FLINK-7024 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.4.0 Reporter: sunjincheng Assignee: sunjincheng We get validate exception,when selecting window.proctime/rowtime on row-based group window. {code} table .window(Tumble over 2.rows on 'proctime as 'w) .groupBy('w, 'string) .select('string, countFun('string) as 'cnt, 'w.rowtime as 'proctime) .window(Over partitionBy 'string orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w2) .select('string, 'cnt.sum over 'w2 as 'cnt) {code} Exception: {code} org.apache.flink.table.api.ValidationException: Window start and Window end cannot be selected for a row-count Tumbling window. at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143) at org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:660) {code} We should add window.proctime/rowtime check in `validate ` method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6522) Add ZooKeeper cleanup logic to ZooKeeperHaServices
[ https://issues.apache.org/jira/browse/FLINK-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065924#comment-16065924 ] ASF GitHub Bot commented on FLINK-6522: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4204 Hi @tillrohrmann , I have created this PR for issue [FLINK-6522.](https://issues.apache.org/jira/browse/FLINK-6522) Could you please have a look when you're free, thanks > Add ZooKeeper cleanup logic to ZooKeeperHaServices > -- > > Key: FLINK-6522 > URL: https://issues.apache.org/jira/browse/FLINK-6522 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.3.0, 1.4.0 >Reporter: Till Rohrmann >Assignee: Fang Yong > > The {{ZooKeeperHaServices}} provide a {{CuratorFramework}} client to access > ZooKeeper data. Consequently, all data (also for different job) are stored > under the same root node. When > {{HighAvailabilityServices#closeAndCleanupAllData}} is called, then this data > should be cleaned up. This cleanup logic is currently missing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4204: [FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaSe...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4204 Hi @tillrohrmann , I have created this PR for issue [FLINK-6522.](https://issues.apache.org/jira/browse/FLINK-6522) Could you please have a look when you're free, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6522) Add ZooKeeper cleanup logic to ZooKeeperHaServices
[ https://issues.apache.org/jira/browse/FLINK-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065922#comment-16065922 ] ASF GitHub Bot commented on FLINK-6522: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4204 [FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaServices Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6522 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4204.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4204 commit b816aa3a2a3d4d1377caccdcbeba91073e89af75 Author: zjureel Date: 2017-06-28T02:12:27Z clean up all data which is stored by RetrievableStateStorageHelper commit 03a2f39e051594cb07017ef2613eb7873b56d118 Author: zjureel Date: 2017-06-28T04:42:30Z clean up all data which is stored in zookeeper > Add ZooKeeper cleanup logic to ZooKeeperHaServices > -- > > Key: FLINK-6522 > URL: https://issues.apache.org/jira/browse/FLINK-6522 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.3.0, 1.4.0 >Reporter: Till Rohrmann >Assignee: Fang Yong > > The {{ZooKeeperHaServices}} provide a {{CuratorFramework}} client to access > ZooKeeper data. Consequently, all data (also for different job) are stored > under the same root node. When > {{HighAvailabilityServices#closeAndCleanupAllData}} is called, then this data > should be cleaned up. This cleanup logic is currently missing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4204: [FLINK-6522] Add ZooKeeper cleanup logic to ZooKee...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4204 [FLINK-6522] Add ZooKeeper cleanup logic to ZooKeeperHaServices Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6522 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4204.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4204 commit b816aa3a2a3d4d1377caccdcbeba91073e89af75 Author: zjureel Date: 2017-06-28T02:12:27Z clean up all data which is stored by RetrievableStateStorageHelper commit 03a2f39e051594cb07017ef2613eb7873b56d118 Author: zjureel Date: 2017-06-28T04:42:30Z clean up all data which is stored in zookeeper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates
[ https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065911#comment-16065911 ] ASF GitHub Bot commented on FLINK-6969: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4183 Hi @fhueske , I like the `firstResultTimeOffset`, a good design ! 👍 Regarding to the watermark and timestamp, it makes sense to me. And a simple approach come to my mind : assign a new `AssignerWithPunctuatedWatermarks` after window aggregate. This will create an operator which ignores the upstream watermarks and assign new watermarks depend on the watermark function. The new watermark function can simply take the element's timestamp as the watermark. ```scala class TimestampAndWatermark[T] extends AssignerWithPunctuatedWatermarks[T] { override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } override def extractTimestamp(element: T, originalTimestamp: Long): Long = originalTimestamp } ``` > Add support for deferred computation for group window aggregates > > > Key: FLINK-6969 > URL: https://issues.apache.org/jira/browse/FLINK-6969 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: sunjincheng > > Deferred computation is a strategy to deal with late arriving data and avoid > updates of previous results. Instead of computing a result as soon as it is > possible (i.e., when a corresponding watermark was received), deferred > computation adds a configurable amount of slack time in which late data is > accepted before the result is compute. For example, instead of computing a > tumbling window of 1 hour at each full hour, we can add a deferred > computation interval of 15 minute to compute the result quarter past each > full hour. > This approach adds latency but can reduce the number of update esp. in use > cases where the user cannot influence the generation of watermarks. It is > also useful if the data is emitted to a system that cannot update result > (files or Kafka). The deferred computation interval should be configured via > the {{QueryConfig}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4183: [FLINK-6969][table]Add support for deferred computation f...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4183 Hi @fhueske , I like the `firstResultTimeOffset`, a good design ! ð Regarding to the watermark and timestamp, it makes sense to me. And a simple approach come to my mind : assign a new `AssignerWithPunctuatedWatermarks` after window aggregate. This will create an operator which ignores the upstream watermarks and assign new watermarks depend on the watermark function. The new watermark function can simply take the element's timestamp as the watermark. ```scala class TimestampAndWatermark[T] extends AssignerWithPunctuatedWatermarks[T] { override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } override def extractTimestamp(element: T, originalTimestamp: Long): Long = originalTimestamp } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7023) Remaining types for Gelly ValueArrays
[ https://issues.apache.org/jira/browse/FLINK-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065883#comment-16065883 ] ASF GitHub Bot commented on FLINK-7023: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/4203 [FLINK-7023] [gelly] Remaining types for Gelly ValueArrays Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the existing implementations of Int/Long/Null/StringValueArray this covers all 10 CopyableValue types. Note: the best way to review these files is to diff against the existing `IntValueArray` and `LongValueArray` implementations as the deltas are very small. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 7023_remaining_tyeps_for_gelly_valuearrays Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4203.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4203 commit 1281058d1c4d1dd1debb0d3502d8d5c68ddbd01b Author: Greg Hogan Date: 2017-06-28T02:54:21Z [FLINK-7023] [gelly] Remaining types for Gelly ValueArrays Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the existing implementations of Int/Long/Null/StringValueArray this covers all 10 CopyableValue types. > Remaining types for Gelly ValueArrays > - > > Key: FLINK-7023 > URL: https://issues.apache.org/jira/browse/FLINK-7023 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.4.0 > > > Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the > existing implementations of Int/Long/Null/StringValueArray this covers all 10 > CopyableValue types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4203: [FLINK-7023] [gelly] Remaining types for Gelly Val...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/4203 [FLINK-7023] [gelly] Remaining types for Gelly ValueArrays Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the existing implementations of Int/Long/Null/StringValueArray this covers all 10 CopyableValue types. Note: the best way to review these files is to diff against the existing `IntValueArray` and `LongValueArray` implementations as the deltas are very small. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 7023_remaining_tyeps_for_gelly_valuearrays Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4203.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4203 commit 1281058d1c4d1dd1debb0d3502d8d5c68ddbd01b Author: Greg Hogan Date: 2017-06-28T02:54:21Z [FLINK-7023] [gelly] Remaining types for Gelly ValueArrays Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the existing implementations of Int/Long/Null/StringValueArray this covers all 10 CopyableValue types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124443977 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- @Xpray @sunjincheng121 I mean we can remove this test, and add a test in `ExpressionReductionTest`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065866#comment-16065866 ] ASF GitHub Bot commented on FLINK-7014: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124443977 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- @Xpray @sunjincheng121 I mean we can remove this test, and add a test in `ExpressionReductionTest`. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065865#comment-16065865 ] ASF GitHub Bot commented on FLINK-7014: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124443845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -57,6 +57,8 @@ class AggSqlFunction( ) { def getFunction: AggregateFunction[_, _] = aggregateFunction + + override def isDeterministic: Boolean = aggregateFunction.isDeterministic --- End diff -- `isDeterministic` is the base method of `SqlOperator` which is the base class of `AggSqlFunction`, `ScalarSqlFunction`, `TableSqlFunction`. So I think maybe it's not necessary to add an abstract class. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124443845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -57,6 +57,8 @@ class AggSqlFunction( ) { def getFunction: AggregateFunction[_, _] = aggregateFunction + + override def isDeterministic: Boolean = aggregateFunction.isDeterministic --- End diff -- `isDeterministic` is the base method of `SqlOperator` which is the base class of `AggSqlFunction`, `ScalarSqlFunction`, `TableSqlFunction`. So I think maybe it's not necessary to add an abstract class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7023) Remaining types for Gelly ValueArrays
Greg Hogan created FLINK-7023: - Summary: Remaining types for Gelly ValueArrays Key: FLINK-7023 URL: https://issues.apache.org/jira/browse/FLINK-7023 Project: Flink Issue Type: Sub-task Components: Gelly Affects Versions: 1.4.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.4.0 Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the existing implementations of Int/Long/Null/StringValueArray this covers all 10 CopyableValue types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065854#comment-16065854 ] ASF GitHub Bot commented on FLINK-7014: --- Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124442541 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- thanks for reviewing, I'll add more test cases in `ExpressionReductionTest ` > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124442541 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- thanks for reviewing, I'll add more test cases in `ExpressionReductionTest ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065834#comment-16065834 ] ASF GitHub Bot commented on FLINK-7014: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124439791 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- Yes, agree with @wuchong , Add a test in `ExpressionReductionTest` is correct. and I think it's better to add a exception test case. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065836#comment-16065836 ] ASF GitHub Bot commented on FLINK-7014: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124439457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala --- @@ -40,6 +40,12 @@ abstract class UserDefinedFunction extends Serializable { @throws(classOf[Exception]) def close(): Unit = {} + /** +* @return true iff a call to this function is guaranteed to always return +* the same result given the same parameters; true is assumed by default +*/ + def isDeterministic: Boolean = true --- End diff -- I suggest explain when user need to overwrite the method, what the impact if not overwrite the method. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065835#comment-16065835 ] ASF GitHub Bot commented on FLINK-7014: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124440259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -57,6 +57,8 @@ class AggSqlFunction( ) { def getFunction: AggregateFunction[_, _] = aggregateFunction + + override def isDeterministic: Boolean = aggregateFunction.isDeterministic --- End diff -- Can we add an abstract class for `AggSqlFunction`, `ScalarSqlFunction`,and `TableSqlFunction`. What do you think? @wuchong @Xpray > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124439791 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- Yes, agree with @wuchong , Add a test in `ExpressionReductionTest` is correct. and I think it's better to add a exception test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124439457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala --- @@ -40,6 +40,12 @@ abstract class UserDefinedFunction extends Serializable { @throws(classOf[Exception]) def close(): Unit = {} + /** +* @return true iff a call to this function is guaranteed to always return +* the same result given the same parameters; true is assumed by default +*/ + def isDeterministic: Boolean = true --- End diff -- I suggest explain when user need to overwrite the method, what the impact if not overwrite the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124440259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -57,6 +57,8 @@ class AggSqlFunction( ) { def getFunction: AggregateFunction[_, _] = aggregateFunction + + override def isDeterministic: Boolean = aggregateFunction.isDeterministic --- End diff -- Can we add an abstract class for `AggSqlFunction`, `ScalarSqlFunction`,and `TableSqlFunction`. What do you think? @wuchong @Xpray --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 rebase the code and @dawidwys @kl0u could you help to take a look at this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065826#comment-16065826 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 rebase the code and @dawidwys @kl0u could you help to take a look at this PR? > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner
[ https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065817#comment-16065817 ] Xingcan Cui commented on FLINK-6936: [~aljoscha], thanks for your attention. You are right, that the state management seems to be the bottleneck since the {{KeyedState}} can not be used here and the {{OperatorState}} is still incomplete (it only supports {{ListState}}). In my view, the main problem lies in the current state mechanism (whether keyed or unkeyed) only supports "symmetrical" states, i.e., it is impossible to assign a designated portion of the global state to a dedicated instance. For the moment, I have implemented a simple record-to-window join according to the design document ([Inner Join in Flink|https://goo.gl/4AdR7h]). The body just looks like that. {code:java} orderA.connect(orderB).process(new JoinMerge[Order, Order]()) .multicast(new JoinPartitioner[Order, Order]) .process(new CommonStreamJoin[Order, Order, Order2]( new JoinFunction[Order, Order, Order2] { override def join(left: Order, right: Order): Order2 = { Order2(left.user, right.user, left.product, right.product, left.amount, right.amount); } }, 6, 1000)) {code} The key idea is to randomly split the left stream and duplicate the right stream to all downstream instances. Correspondingly, I use {{OperatorState}} ({{ListState}}) to store a portion of the cached left stream and a full copy of the cached right stream in each {{CommonStreamJoin}} instance. It seems to be quite costly and will not work correctly under some circumstances (e.g., increase the parallelism, state inconsistency occurs), but I cannot imagine other implementations unless there is a better state management mechanism provided. What do you think? Codes for the three main classes {{JoinMerge}}, {{JoinPartitioner}} and {{CommonStreamJoin}} (it does not remove expired data now) are as follows. {code:java} class JoinMerge[L, R] extends CoProcessFunction[L, R, Either[L, R]] { override def processElement1(value: L, ctx: CoProcessFunction[L, R, Either[L, R]]#Context, out: Collector[Either[L, R]]): Unit = { out.collect(Left(value)) } override def processElement2(value: R, ctx: CoProcessFunction[L, R, Either[L, R]]#Context, out: Collector[Either[L, R]]): Unit = { out.collect(Right(value)) } } {code} {code:java} class JoinPartitioner[L, R] extends MultiPartitioner[Either[L, R]] { var targets: Array[Int] = null override def partition(record: Either[L, R], numPartitions: Int): Array[Int] = { if (record.isLeft) { if (!(null != targets && targets.length == numPartitions)) { targets = Array.range(0, numPartitions) } return targets } else { Array(Random.nextInt(numPartitions)) } } } {code} {code:java} class CommonStreamJoin[L, R, O](val joinFunction: JoinFunction[L, R, O], val cacheTime: Long, val rOffset: Long) extends ProcessFunction[Either[L, R], O] with CheckpointedFunction { val leftCache = new TreeMap[Long, JList[L]] val rightCache = new TreeMap[Long, JList[R]] var leftState: ListState[TMap[Long, JList[L]]] = null var rightState: ListState[MEntry[Long, JList[R]]] = null val leftDescriptor = new ListStateDescriptor[TMap[Long, JList[L]]]("leftCache", TypeInformation.of(new TypeHint[TMap[Long, JList[L]]]() {})) val rightDescriptor = new ListStateDescriptor[MEntry[Long, JList[R]]]("rightCache", TypeInformation.of(new TypeHint[MEntry[Long, JList[R]]]() {})) var result: O = _ override def processElement(value: Either[L, R], ctx: ProcessFunction[Either[L, R], O]#Context, out: Collector[O]): Unit = { //TODO this should be replaced with timestamps contained in records val time = ctx.timerService().currentProcessingTime() if (value.isRight) { val right = value.right.get if (!rightCache.containsKey(time)) { rightCache.put(time, new util.LinkedList[R]()) } rightCache.get(time).add(right) for (leftList <- leftCache.values()) { for (left <- leftList) { result = joinFunction.join(left, right) if (null != result) { out.collect(result) } } } } else { val left = value.left.get if (!leftCache.containsKey(time)) { leftCache.put(time, new util.LinkedList[L]()) } leftCache.get(time).add(left) for (rightList <- rightCache.values()) { for (right <- rightList) { result = joinFunction.join(left, right) if (null != result) { out.collect(result) }
[jira] [Assigned] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x
[ https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6407: --- Assignee: mingleizhang > Upgrade AVRO dependency version to 1.8.x > > > Key: FLINK-6407 > URL: https://issues.apache.org/jira/browse/FLINK-6407 > Project: Flink > Issue Type: Wish > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.1 >Reporter: Miguel >Assignee: mingleizhang >Priority: Minor > > Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is > limited to String type keys). It was solved in Avro 1.8.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065798#comment-16065798 ] ASF GitHub Bot commented on FLINK-7014: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124436213 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- I think an IT case can't cover this change. Whether the UDF is deterministic or not, the result should be same. > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4200: [FLINK-7014] Expose isDeterministic interface to S...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4200#discussion_r124436213 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -230,6 +231,26 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testScalarFunctionDeterministic(): Unit = { --- End diff -- I think an IT case can't cover this change. Whether the UDF is deterministic or not, the result should be same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6407) Upgrade AVRO dependency version to 1.8.x
[ https://issues.apache.org/jira/browse/FLINK-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065792#comment-16065792 ] mingleizhang commented on FLINK-6407: - Avro has a release version of 1.8.2 now. I would upgrade it to 1.8.2 if there is no problem. > Upgrade AVRO dependency version to 1.8.x > > > Key: FLINK-6407 > URL: https://issues.apache.org/jira/browse/FLINK-6407 > Project: Flink > Issue Type: Wish > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.1 >Reporter: Miguel >Priority: Minor > > Avro 1.7.6 and 1.7.7 does not support Map that uses java Enum keys (it is > limited to String type keys). It was solved in Avro 1.8.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6422) Unreachable code in FileInputFormat#createInputSplits
[ https://issues.apache.org/jira/browse/FLINK-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065744#comment-16065744 ] ASF GitHub Bot commented on FLINK-6422: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4202 Thanks @tedyu for reporting this. cc @StephanEwen @tedyu Could both you take a look ? I checked the first version you designed. > Unreachable code in FileInputFormat#createInputSplits > - > > Key: FLINK-6422 > URL: https://issues.apache.org/jira/browse/FLINK-6422 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > Here is related code: > {code} > if (minNumSplits < 1) { > throw new IllegalArgumentException("Number of input splits has to be at > least 1."); > } > ... > final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : > (totalLength / minNumSplits + > (totalLength % minNumSplits == 0 ? 0 : 1)); > {code} > minNumSplits wouldn't be less than 1 getting to the assignment of > maxSplitSize. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4202: [FLINK-6422] [core] Unreachable code in FileInputFormat#c...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4202 Thanks @tedyu for reporting this. cc @StephanEwen @tedyu Could both you take a look ? I checked the first version you designed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6422) Unreachable code in FileInputFormat#createInputSplits
[ https://issues.apache.org/jira/browse/FLINK-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065741#comment-16065741 ] ASF GitHub Bot commented on FLINK-6422: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4202 [FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSp… Fix Unreachable code in FileInputFormat#createInputSplits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6422 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4202.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4202 commit 4cc3e55621372eaf4a9661939f52656aeef4eeef Author: zhangminglei Date: 2017-06-28T00:45:12Z [FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits > Unreachable code in FileInputFormat#createInputSplits > - > > Key: FLINK-6422 > URL: https://issues.apache.org/jira/browse/FLINK-6422 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > Here is related code: > {code} > if (minNumSplits < 1) { > throw new IllegalArgumentException("Number of input splits has to be at > least 1."); > } > ... > final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : > (totalLength / minNumSplits + > (totalLength % minNumSplits == 0 ? 0 : 1)); > {code} > minNumSplits wouldn't be less than 1 getting to the assignment of > maxSplitSize. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4202: [FLINK-6422] [core] Unreachable code in FileInputF...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4202 [FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSp⦠Fix Unreachable code in FileInputFormat#createInputSplits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6422 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4202.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4202 commit 4cc3e55621372eaf4a9661939f52656aeef4eeef Author: zhangminglei Date: 2017-06-28T00:45:12Z [FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6843) ClientConnectionTest fails on travis
[ https://issues.apache.org/jira/browse/FLINK-6843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065709#comment-16065709 ] mingleizhang commented on FLINK-6843: - [~aljoscha] I guess caused this because of java version mismatch. And J2SE 8 belongs to 52. The error message to {code:java} Unsupported major.minor version 52.0 {code} is because during compile time using a higher JDK and a lower JDK during runtime here is jdk7 belongs to 51. I dont know how Travis works, it might be travis compile the source code on a machine which use JDK8 and run .class file on a jdk7 machine. > ClientConnectionTest fails on travis > > > Key: FLINK-6843 > URL: https://issues.apache.org/jira/browse/FLINK-6843 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.2 > > > jdk7, hadoop 2.4.1, scala 2.11 > {code} > testJobManagerRetrievalWithHAServices(org.apache.flink.client.program.ClientConnectionTest) > Time elapsed: 0.013 sec <<< ERROR! > java.lang.UnsupportedClassVersionError: > org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices : > Unsupported major.minor version 52.0 > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:800) > at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) > at java.net.URLClassLoader.access$100(URLClassLoader.java:71) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at > org.apache.flink.client.program.ClientConnectionTest.testJobManagerRetrievalWithHAServices(ClientConnectionTest.java:122) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7022) Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup
Scott Kidder created FLINK-7022: --- Summary: Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup Key: FLINK-7022 URL: https://issues.apache.org/jira/browse/FLINK-7022 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.2.1, 1.3.0, 1.2.0 Environment: Kubernetes cluster running: * Flink 1.3.0 Job Manager & Task Manager on Java 8u131 * Zookeeper 3.4.10 cluster with 3 nodes Reporter: Scott Kidder h2. Problem Flink Job Manager web frontend is permanently unavailable if one or more Zookeeper nodes are unresolvable during startup. The job scheduler eventually recovers and assigns jobs to task managers, but the web frontend continues to respond with an HTTP 503 and the following message: {noformat}Service temporarily unavailable due to an ongoing leader election. Please refresh.{noformat} h2. Expected Behavior Once Flink is able to interact with Zookeeper successfully, all aspects of the Job Manager (job scheduling & the web frontend) should be available. h2. Environment Details We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a configuration that automatically detects and applies operating system updates. We have a Zookeeper node running on the same CoreOS instance as Flink. It's possible that the Zookeeper node will not yet be started when the Flink components are started. This could cause hostname resolution of the Zookeeper nodes to fail. h3. Flink Task Manager Logs {noformat} 2017-06-27 15:38:47,161 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: metrics.reporter.statsd.host, localhost 2017-06-27 15:38:47,161 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: metrics.reporter.statsd.port, 8125 2017-06-27 15:38:47,162 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: metrics.reporter.statsd.interval, 10 SECONDS 2017-06-27 15:38:47,254 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: state.backend, filesystem 2017-06-27 15:38:47,254 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: state.backend.fs.checkpointdir, hdfs://hdfs:8020/flink/checkpoints 2017-06-27 15:38:47,255 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: state.savepoints.dir, hdfs://hdfs:8020/flink/savepoints 2017-06-27 15:38:47,255 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: recovery.mode, zookeeper 2017-06-27 15:38:47,256 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: recovery.zookeeper.quorum, zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181 2017-06-27 15:38:47,256 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: recovery.zookeeper.storageDir, hdfs://hdfs:8020/flink/recovery 2017-06-27 15:38:47,256 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: recovery.jobmanager.port, 6123 2017-06-27 15:38:47,257 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: blob.server.port, 41479 2017-06-27 15:38:47,357 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'recovery.mode' instead of proper key 'high-availability' 2017-06-27 15:38:47,366 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager with high-availability 2017-06-27 15:38:47,366 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'recovery.jobmanager.port' instead of proper key 'high-availability.jobmanager.port' 2017-06-27 15:38:47,452 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager on flink:6123 with execution mode CLUSTER 2017-06-27 15:38:47,549 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, flink 2017-06-27 15:38:47,549 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2017-06-27 15:38:47,549 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.mb, 1024 2017-06-27 15:38:47,549 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 1024 2017-06-27 15:38:47,549 INFO org.apache.flink.configuration.GlobalConfiguration
[jira] [Commented] (FLINK-7017) Remove netty usages in flink-tests
[ https://issues.apache.org/jira/browse/FLINK-7017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065675#comment-16065675 ] ASF GitHub Bot commented on FLINK-7017: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4196#discussion_r124424384 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java --- @@ -83,16 +78,16 @@ public static void initialize() throws Exception { assertTrue("Unable to create temp directory", logDir.mkdir()); File logFile = new File(logDir, "jobmanager.log"); File outFile = new File(logDir, "jobmanager.out"); - + Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); - + --- End diff -- Sometimes, I also get this kinda stuff. But seems i didnt do anything relevant to this style. > Remove netty usages in flink-tests > -- > > Key: FLINK-7017 > URL: https://issues.apache.org/jira/browse/FLINK-7017 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4196: [FLINK-7017] Remove netty usages in flink-tests
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4196#discussion_r124424384 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java --- @@ -83,16 +78,16 @@ public static void initialize() throws Exception { assertTrue("Unable to create temp directory", logDir.mkdir()); File logFile = new File(logDir, "jobmanager.log"); File outFile = new File(logDir, "jobmanager.out"); - + Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - + config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); - + --- End diff -- Sometimes, I also get this kinda stuff. But seems i didnt do anything relevant to this style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable
Scott Kidder created FLINK-7021: --- Summary: Flink Task Manager hangs on startup if one Zookeeper node is unresolvable Key: FLINK-7021 URL: https://issues.apache.org/jira/browse/FLINK-7021 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.1, 1.3.0, 1.2.0 Environment: Kubernetes cluster running: * Flink 1.3.0 Job Manager & Task Manager on Java 8u131 * Zookeeper 3.4.10 cluster with 3 nodes Reporter: Scott Kidder h2. Problem Flink Task Manager will hang during startup if one of the Zookeeper nodes in the Zookeeper connection string is unresolvable. h2. Expected Behavior Flink should retry name resolution & connection to Zookeeper nodes with exponential back-off. h2. Environment Details We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a configuration that automatically detects and applies operating system updates. We have a Zookeeper node running on the same CoreOS instance as Flink. It's possible that the Zookeeper node will not yet be started when the Flink components are started. This could cause hostname resolution of the Zookeeper nodes to fail. h3. Flink Task Manager Logs {noformat} 2017-06-27 15:38:51,713 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using configured hostname/address for TaskManager: 10.2.45.11 2017-06-27 15:38:51,714 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager 2017-06-27 15:38:51,714 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor system at 10.2.45.11:6122. 2017-06-27 15:38:52,950 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2017-06-27 15:38:53,079 INFO Remoting - Starting remoting 2017-06-27 15:38:53,573 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@10.2.45.11:6122] 2017-06-27 15:38:53,576 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor 2017-06-27 15:38:53,660 INFO org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig [server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 2 (manual), number of client threads: 2 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)] 2017-06-27 15:38:53,682 INFO org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have a max timeout of 1 ms 2017-06-27 15:38:53,688 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file directory '/tmp': total 49 GB, usable 42 GB (85.71% usable) 2017-06-27 15:38:54,071 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 96 MB for network buffer pool (number of memory segments: 3095, bytes per segment: 32768). 2017-06-27 15:38:54,564 INFO org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the network environment and its components. 2017-06-27 15:38:54,576 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful initialization (took 4 ms). 2017-06-27 15:38:54,677 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121. 2017-06-27 15:38:54,981 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting managed memory to 0.7 of the currently free heap space (612 MB), memory will be allocated lazily. 2017-06-27 15:38:55,050 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill files. 2017-06-27 15:38:55,061 INFO org.apache.flink.runtime.metrics.MetricRegistry - Configuring StatsDReporter with {interval=10 SECONDS, port=8125, host=localhost, class=org.apache.flink.metrics.statsd.StatsDReporter}. 2017-06-27 15:38:55,065 INFO org.apache.flink.metrics.statsd.StatsDReporter - Configured StatsDReporter with {host:localhost, port:8125} 2017-06-27 15:38:55,065 INFO org.apache.flink.runtime.metrics.MetricRegistry - Periodically reporting metrics in intervals of 10 SECONDS for reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter. 2017-06-27 15:38:55,175 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-e4c5bcc5-7513-40d9-a665-0d33c80a36ba 2017-06-27 15:38:55,187 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses director
[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065620#comment-16065620 ] ASF GitHub Bot commented on FLINK-6998: --- Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai **Regarding the metric naming:** Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names. I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity perform the monitoring (difference of committed offset vs log head), especially in failure situations. **Regarding the implementation:** Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question. > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > --- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > > Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...
Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai **Regarding the metric naming:** Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names. I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity perform the monitoring (difference of committed offset vs log head), especially in failure situations. **Regarding the implementation:** Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065597#comment-16065597 ] ASF GitHub Bot commented on FLINK-6998: --- Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413756 --- Diff: docs/monitoring/metrics.md --- @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: + Connector: + + + + Scope + Metrics + Description + + + + + Slot/Consumer + kafkaCommitsSucceeded + Kafka offset commit success count if Kafka commit is turned on. + + + Slot/Consumer + kafkaCommitsFailed + Kafka offset commit failure count if Kafka commit is turned on. --- End diff -- Sure, thanks for the feedback. Will update.a > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > --- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > > Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...
Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413737 --- Diff: docs/monitoring/metrics.md --- @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: + Connector: + + + + Scope + Metrics + Description + + + + + Slot/Consumer + kafkaCommitsSucceeded + Kafka offset commit success count if Kafka commit is turned on. --- End diff -- Sure, thanks for the feedback. Will update.a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065595#comment-16065595 ] ASF GitHub Bot commented on FLINK-6998: --- Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413737 --- Diff: docs/monitoring/metrics.md --- @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: + Connector: + + + + Scope + Metrics + Description + + + + + Slot/Consumer + kafkaCommitsSucceeded + Kafka offset commit success count if Kafka commit is turned on. --- End diff -- Sure, thanks for the feedback. Will update.a > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > --- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > > Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...
Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413756 --- Diff: docs/monitoring/metrics.md --- @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: + Connector: + + + + Scope + Metrics + Description + + + + + Slot/Consumer + kafkaCommitsSucceeded + Kafka offset commit success count if Kafka commit is turned on. + + + Slot/Consumer + kafkaCommitsFailed + Kafka offset commit failure count if Kafka commit is turned on. --- End diff -- Sure, thanks for the feedback. Will update.a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter
[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065313#comment-16065313 ] ASF GitHub Bot commented on FLINK-7009: --- Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124366697 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -179,41 +254,130 @@ private String prefix(String ... names) { } } - private void send(final String name, final String value) { + private String buildStatsdLine(final String name, final String value, final String tags) { + Double number; try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(StandardCharsets.UTF_8); - socket.send(new DatagramPacket(data, data.length, this.address)); + number = Double.parseDouble(value); + } + catch (NumberFormatException e) { + // quietly skip values like "n/a" + return ""; } - catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + if (number >= 0.) { + return String.format("%s:%s|g%s", name, value, tags != null ? tags : ""); + } else { + // quietly skip "unknowns" like lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN + return ""; } } - @Override - public String filterCharacters(String input) { + private void send(final String name, final String value, final String tags) { + String formatted = buildStatsdLine(name, value, tags); + if (formatted.length() > 0) { + try { + byte[] data = formatted.getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(data, data.length, this.address)); + } + catch (IOException e) { + LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + } + } + } + + /** + * dogstatsd names should: start with letter, uses ascii alphanumerics and underscore, separated by periods. + * Collapse runs of invalid characters into an underscore. Discard invalid prefix and suffix. + * Eg: ":::metric:::name:::" -> "metric_name" + */ + + private boolean isValidStatsdChar(char c) { + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c == '_'); --- End diff -- Yeah that's a fair point. The thought here is to take an extremely limited set to maximize compatibility with collectors and timeseries databases, to avoid translation. I think if one were to make a compatibliity table you might find a few extra punctuation type characters, but I couldn't see how they changed the significance or meaning of the metrics, so landed on picking one commonly accepted non-period delimiter. The underbar is also guidance from datadog, for best practice with their systems. The metrics from Flink are super clean this way. Flink metric names seem to always be alpha and dots, the tag names are all alpha and underscore (eg `task_id`), so looks quite natural for the tag values to be alphanumeric + underscore. But yeah, is an arbitrary choice. > dogstatsd mode in statsd reporter > - > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter >Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series d
[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter
Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124366697 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -179,41 +254,130 @@ private String prefix(String ... names) { } } - private void send(final String name, final String value) { + private String buildStatsdLine(final String name, final String value, final String tags) { + Double number; try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(StandardCharsets.UTF_8); - socket.send(new DatagramPacket(data, data.length, this.address)); + number = Double.parseDouble(value); + } + catch (NumberFormatException e) { + // quietly skip values like "n/a" + return ""; } - catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + if (number >= 0.) { + return String.format("%s:%s|g%s", name, value, tags != null ? tags : ""); + } else { + // quietly skip "unknowns" like lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN + return ""; } } - @Override - public String filterCharacters(String input) { + private void send(final String name, final String value, final String tags) { + String formatted = buildStatsdLine(name, value, tags); + if (formatted.length() > 0) { + try { + byte[] data = formatted.getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(data, data.length, this.address)); + } + catch (IOException e) { + LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + } + } + } + + /** + * dogstatsd names should: start with letter, uses ascii alphanumerics and underscore, separated by periods. + * Collapse runs of invalid characters into an underscore. Discard invalid prefix and suffix. + * Eg: ":::metric:::name:::" -> "metric_name" + */ + + private boolean isValidStatsdChar(char c) { + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c == '_'); --- End diff -- Yeah that's a fair point. The thought here is to take an extremely limited set to maximize compatibility with collectors and timeseries databases, to avoid translation. I think if one were to make a compatibliity table you might find a few extra punctuation type characters, but I couldn't see how they changed the significance or meaning of the metrics, so landed on picking one commonly accepted non-period delimiter. The underbar is also guidance from datadog, for best practice with their systems. The metrics from Flink are super clean this way. Flink metric names seem to always be alpha and dots, the tag names are all alpha and underscore (eg `task_id`), so looks quite natural for the tag values to be alphanumeric + underscore. But yeah, is an arbitrary choice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter
[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065293#comment-16065293 ] ASF GitHub Bot commented on FLINK-7009: --- Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124364140 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -90,6 +109,45 @@ public void close() { // + /** +* Removes leading and trailing angle brackets. +*/ + private String stripBrackets(String str) { + return str.substring(1, str.length() - 1); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + if (dogstatsdMode) { + // memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val" + StringBuilder statsdTagLine = new StringBuilder(); + Map orderedTags = new TreeMap<>(group.getAllVariables()); + for (Map.Entry entry: orderedTags.entrySet()) { + String k = stripBrackets(entry.getKey()); + String v = filterCharacters(entry.getValue()); + statsdTagLine.append(",").append(k).append(":").append(v); + } + if (statsdTagLine.length() > 0) { + // remove first comma, prefix with "|#" + tagTable.put(metric, "|#" + statsdTagLine.substring(1)); --- End diff -- 👍 > dogstatsd mode in statsd reporter > - > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter >Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters pl
[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter
Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124364140 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -90,6 +109,45 @@ public void close() { // + /** +* Removes leading and trailing angle brackets. +*/ + private String stripBrackets(String str) { + return str.substring(1, str.length() - 1); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + if (dogstatsdMode) { + // memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val" + StringBuilder statsdTagLine = new StringBuilder(); + Map orderedTags = new TreeMap<>(group.getAllVariables()); + for (Map.Entry entry: orderedTags.entrySet()) { + String k = stripBrackets(entry.getKey()); + String v = filterCharacters(entry.getValue()); + statsdTagLine.append(",").append(k).append(":").append(v); + } + if (statsdTagLine.length() > 0) { + // remove first comma, prefix with "|#" + tagTable.put(metric, "|#" + statsdTagLine.substring(1)); --- End diff -- ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter
[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065289#comment-16065289 ] ASF GitHub Bot commented on FLINK-7009: --- Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124363952 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -90,6 +109,45 @@ public void close() { // + /** +* Removes leading and trailing angle brackets. +*/ + private String stripBrackets(String str) { + return str.substring(1, str.length() - 1); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + if (dogstatsdMode) { + // memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val" + StringBuilder statsdTagLine = new StringBuilder(); + Map orderedTags = new TreeMap<>(group.getAllVariables()); --- End diff -- sorry! this was the beginning of looking at a more efficient tag table, as many entries are duplicates, but I'll take out since we're going to put the tag line in the metric object > dogstatsd mode in statsd reporter > - > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter >Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters plus the > hash of the stable string: > {code} > TriggerWin_d8c007da > {code} > This is just one way of dealing with unruly default names, the main point > would be to preserve the metrics so they are valid, avoid truncation, and can > be aggregated along other dimensions even if this particular dimension is > hard to parse after the compression. -- This message was sent by A
[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter
Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124363952 --- Diff: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java --- @@ -90,6 +109,45 @@ public void close() { // + /** +* Removes leading and trailing angle brackets. +*/ + private String stripBrackets(String str) { + return str.substring(1, str.length() - 1); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + if (dogstatsdMode) { + // memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val" + StringBuilder statsdTagLine = new StringBuilder(); + Map orderedTags = new TreeMap<>(group.getAllVariables()); --- End diff -- sorry! this was the beginning of looking at a more efficient tag table, as many entries are duplicates, but I'll take out since we're going to put the tag line in the metric object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7009) dogstatsd mode in statsd reporter
[ https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065283#comment-16065283 ] ASF GitHub Bot commented on FLINK-7009: --- Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124363455 --- Diff: docs/monitoring/metrics.md --- @@ -420,10 +420,30 @@ metrics.reporter.grph.protocol: TCP In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder of your Flink distribution. +In `dogstatsd` mode, all variables in Flink metrics such as ``, ``, ``, ``, ``, +`` and others, will be included as tags. It is recommended to define scopes for this reporter such that no --- End diff -- good call! I'll make these and other suggested changes, hopefully this week > dogstatsd mode in statsd reporter > - > > Key: FLINK-7009 > URL: https://issues.apache.org/jira/browse/FLINK-7009 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 > Environment: org.apache.flink.metrics.statsd.StatsDReporter >Reporter: David Brinegar > Fix For: 1.4.0 > > > The current statsd reporter can only report a subset of Flink metrics owing > to the manner in which Flink variables are handled, mainly around invalid > characters and metrics too long. As an option, it would be quite useful to > have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged, > should be less than 200 characters including tag names and values, be > alphanumeric + underbar, delimited by periods. As a further pragmatic > restriction, negative and other invalid values should be ignored rather than > sent to the backend. These restrictions play well with a broad set of > collectors and time series databases. > This mode would: > * convert output to ascii alphanumeric characters with underbar, delimited by > periods. Runs of invalid characters within a metric segment would be > collapsed to a single underbar. > * report all Flink variables as tags > * compress overly long segments, say over 50 chars, to a symbolic > representation of the metric name, to preserve the unique metric time series > but avoid downstream truncation > * compress 32 character Flink IDs like tm_id, task_id, job_id, > task_attempt_id, to the first 8 characters, again to preserve enough > distinction amongst metrics while trimming up to 96 characters from the metric > * remove object references from names, such as the instance hash id of the > serializer > * drop negative or invalid numeric values such as "n/a", "-1" which is used > for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is > used for unknowns like currentLowWaterMark > With these in place, it becomes quite reasonable to support LatencyGauge > metrics as well. > One idea for symbolic compression is to take the first 10 valid characters > plus a hash of the long name. For example, a value like this operator_name: > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > would first drop the instance references. The stable version would be: > > {code:java} > TriggerWindow(TumblingProcessingTimeWindows(5000), > ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer, > > reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1}, > ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301)) > {code} > and then the compressed name would be the first ten valid characters plus the > hash of the stable string: > {code} > TriggerWin_d8c007da > {code} > This is just one way of dealing with unruly default names, the main point > would be to preserve the metrics so they are valid, avoid truncation, and can > be aggregated along other dimensions even if this particular dimension is > hard to parse after the compression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4188: [FLINK-7009] dogstatsd mode in statds reporter
Github user dbrinegar commented on a diff in the pull request: https://github.com/apache/flink/pull/4188#discussion_r124363455 --- Diff: docs/monitoring/metrics.md --- @@ -420,10 +420,30 @@ metrics.reporter.grph.protocol: TCP In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder of your Flink distribution. +In `dogstatsd` mode, all variables in Flink metrics such as ``, ``, ``, ``, ``, +`` and others, will be included as tags. It is recommended to define scopes for this reporter such that no --- End diff -- good call! I'll make these and other suggested changes, hopefully this week --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065195#comment-16065195 ] sunjincheng commented on FLINK-6895: Hi [~Aegeaner] thanks. > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065187#comment-16065187 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124341520 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala --- @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] +val window = agg.getWindow -// Retrieve window start and end properties +val isRowtime = isRowtimeAttribute(window.timeAttribute) +val isProctime = isProctimeAttribute(window.timeAttribute) + +val startEndProperties = Seq( + NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute))) + +// allow rowtime/proctime for rowtime windows and proctime for proctime windows +val timeProperties = if (isRowtime) { + Seq( +NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)), +NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) --- End diff -- Why rowtime windows need the proctime property? > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065189#comment-16065189 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124336278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala --- @@ -28,6 +28,7 @@ import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.api._ --- End diff -- Please remove useless impor at line 26 `import org.apache.calcite.sql.fun.SqlStdOperatorTable`. > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065190#comment-16065190 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124344855 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala --- @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] +val window = agg.getWindow -// Retrieve window start and end properties +val isRowtime = isRowtimeAttribute(window.timeAttribute) +val isProctime = isProctimeAttribute(window.timeAttribute) + +val startEndProperties = Seq( + NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute))) + +// allow rowtime/proctime for rowtime windows and proctime for proctime windows +val timeProperties = if (isRowtime) { + Seq( +NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)), +NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) +} else if (isProctime) { + Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) +} else { + Seq() +} + +val properties = startEndProperties ++ timeProperties + +// retrieve window start and end properties val transformed = call.builder() val rexBuilder = transformed.getRexBuilder transformed.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( -NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), -NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute)) - ), agg) + window, + properties, + agg) ) // forward window start and end properties transformed.project( - innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) + innerProject.getProjects ++ properties.map(np => transformed.field(np.name))) def replaceGroupAuxiliaries(node: RexNode): RexNode = { node match { case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) + case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) + +case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) => + if (isProctime) { +throw ValidationException("A proctime window cannot provide a rowtime attribute.") + } else if (isRowtime) { +// replace expression by access to window rowtime +transformed.field("w$rowtime") + } else { +throw TableException("Accessing the rowtime attribute of a window is not yet " + + "supported in a batch environment.") + } + +case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) => + if (isProctime) { +// replace expression by access to window proctime +transformed.field("w$proctime") + } else { +throw ValidationException("Proctime is not supported in a batch environment.") + } --- End diff -- We can throw this exception in a Stream rowtime window if we want query `TUMBLE_PROCTIME`, So I thinks this message should be improve or add a `isRowtime` process. > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065186#comment-16065186 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124335359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/MathFunctions.scala --- @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.functions.utils +package org.apache.flink.table.runtime.functions --- End diff -- I suggest `MathFunctions`->`ScalarFunctions`. so that we can add all scalar functions in one file. and in the `...runtime.functions` package we have three file `ScalarFunctions`,`TableFunctions` and `AggFunctions`.Which in FLINK-6810 plan to do. What do you think? > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065188#comment-16065188 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124329598 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java --- @@ -1900,7 +1900,7 @@ public void unparse( new SqlGroupFunction(SqlKind.TUMBLE, null, OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) { - @Override List getAuxiliaryFunctions() { + @Override public List getAuxiliaryFunctions() { --- End diff -- Dose this method only using in `SqlStdOperatorTable.java`? if so , why we add `public`. I find calcite have not add this modifier. > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065191#comment-16065191 ] ASF GitHub Bot commented on FLINK-6584: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124335785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala --- @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { isTimeIndicatorType(updatedCall.getOperands.get(0).getType) => updatedCall +case BasicOperatorTable.TUMBLE_ROWTIME | +BasicOperatorTable.TUMBLE_PROCTIME | +BasicOperatorTable.HOP_ROWTIME | +BasicOperatorTable.HOP_PROCTIME | +BasicOperatorTable.SESSION_ROWTIME | +BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) => + updatedCall --- End diff -- Can we remove the condition of `if isTimeIndicatorType(updatedCall.getType) `? > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124329598 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java --- @@ -1900,7 +1900,7 @@ public void unparse( new SqlGroupFunction(SqlKind.TUMBLE, null, OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) { - @Override List getAuxiliaryFunctions() { + @Override public List getAuxiliaryFunctions() { --- End diff -- Dose this method only using in `SqlStdOperatorTable.java`? if so , why we add `public`. I find calcite have not add this modifier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124335785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala --- @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { isTimeIndicatorType(updatedCall.getOperands.get(0).getType) => updatedCall +case BasicOperatorTable.TUMBLE_ROWTIME | +BasicOperatorTable.TUMBLE_PROCTIME | +BasicOperatorTable.HOP_ROWTIME | +BasicOperatorTable.HOP_PROCTIME | +BasicOperatorTable.SESSION_ROWTIME | +BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) => + updatedCall --- End diff -- Can we remove the condition of `if isTimeIndicatorType(updatedCall.getType) `? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124341520 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala --- @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] +val window = agg.getWindow -// Retrieve window start and end properties +val isRowtime = isRowtimeAttribute(window.timeAttribute) +val isProctime = isProctimeAttribute(window.timeAttribute) + +val startEndProperties = Seq( + NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute))) + +// allow rowtime/proctime for rowtime windows and proctime for proctime windows +val timeProperties = if (isRowtime) { + Seq( +NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)), +NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) --- End diff -- Why rowtime windows need the proctime property? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124344855 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala --- @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] +val window = agg.getWindow -// Retrieve window start and end properties +val isRowtime = isRowtimeAttribute(window.timeAttribute) +val isProctime = isProctimeAttribute(window.timeAttribute) + +val startEndProperties = Seq( + NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute))) + +// allow rowtime/proctime for rowtime windows and proctime for proctime windows +val timeProperties = if (isRowtime) { + Seq( +NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)), +NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) +} else if (isProctime) { + Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) +} else { + Seq() +} + +val properties = startEndProperties ++ timeProperties + +// retrieve window start and end properties val transformed = call.builder() val rexBuilder = transformed.getRexBuilder transformed.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( -NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), -NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute)) - ), agg) + window, + properties, + agg) ) // forward window start and end properties transformed.project( - innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) + innerProject.getProjects ++ properties.map(np => transformed.field(np.name))) def replaceGroupAuxiliaries(node: RexNode): RexNode = { node match { case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) + case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) + +case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) => + if (isProctime) { +throw ValidationException("A proctime window cannot provide a rowtime attribute.") + } else if (isRowtime) { +// replace expression by access to window rowtime +transformed.field("w$rowtime") + } else { +throw TableException("Accessing the rowtime attribute of a window is not yet " + + "supported in a batch environment.") + } + +case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) => + if (isProctime) { +// replace expression by access to window proctime +transformed.field("w$proctime") + } else { +throw ValidationException("Proctime is not supported in a batch environment.") + } --- End diff -- We can throw this exception in a Stream rowtime window if we want query `TUMBLE_PROCTIME`, So I thinks this message should be improve or add a `isRowtime` process. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124336278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala --- @@ -28,6 +28,7 @@ import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.api._ --- End diff -- Please remove useless impor at line 26 `import org.apache.calcite.sql.fun.SqlStdOperatorTable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124335359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/MathFunctions.scala --- @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.functions.utils +package org.apache.flink.table.runtime.functions --- End diff -- I suggest `MathFunctions`->`ScalarFunctions`. so that we can add all scalar functions in one file. and in the `...runtime.functions` package we have three file `ScalarFunctions`,`TableFunctions` and `AggFunctions`.Which in FLINK-6810 plan to do. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7006) Base class using POJOs for Gelly algorithms
[ https://issues.apache.org/jira/browse/FLINK-7006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065146#comment-16065146 ] ASF GitHub Bot commented on FLINK-7006: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/4201 [FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms Gelly algorithms commonly have a Result class extending a Tuple type and implementing one of the Unary/Binary/TertiaryResult interfaces. Add a Unary/Binary/TertiaryResultBase class implementing each interface and convert the Result classes to POJOs extending the base result classes. Note: The `TriangleListing` hashes changed because previously these algorithms did not have a `Result` class and simply used the `Tuple` `hashCode`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 7006_base_class_using_pojos_for_gelly_algorithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4201 commit 15ac62ba7d0f7982961aa4b631870a641510c707 Author: Greg Hogan Date: 2017-06-26T14:21:50Z [FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms Gelly algorithms commonly have a Result class extending a Tuple type and implementing one of the Unary/Binary/TertiaryResult interfaces. Add a Unary/Binary/TertiaryResultBase class implementing each interface and convert the Result classes to POJOs extending the base result classes. > Base class using POJOs for Gelly algorithms > --- > > Key: FLINK-7006 > URL: https://issues.apache.org/jira/browse/FLINK-7006 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.4.0 > > > Gelly algorithms commonly have a {{Result}} class extending a {{Tuple}} type > and implementing one of the {{Unary/Binary/TertiaryResult}} interfaces. > Add a {{Unary/Binary/TertiaryResultBase}} class implementing each interface > and convert the {{Result}} classes to POJOs extending the base result classes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4201: [FLINK-7006] [gelly] Base class using POJOs for Ge...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/4201 [FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms Gelly algorithms commonly have a Result class extending a Tuple type and implementing one of the Unary/Binary/TertiaryResult interfaces. Add a Unary/Binary/TertiaryResultBase class implementing each interface and convert the Result classes to POJOs extending the base result classes. Note: The `TriangleListing` hashes changed because previously these algorithms did not have a `Result` class and simply used the `Tuple` `hashCode`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 7006_base_class_using_pojos_for_gelly_algorithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4201 commit 15ac62ba7d0f7982961aa4b631870a641510c707 Author: Greg Hogan Date: 2017-06-26T14:21:50Z [FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms Gelly algorithms commonly have a Result class extending a Tuple type and implementing one of the Unary/Binary/TertiaryResult interfaces. Add a Unary/Binary/TertiaryResultBase class implementing each interface and convert the Result classes to POJOs extending the base result classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7007) Add README to "flink-shaded.git" repository
[ https://issues.apache.org/jira/browse/FLINK-7007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065138#comment-16065138 ] ASF GitHub Bot commented on FLINK-7007: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink-shaded/pull/3#discussion_r124335176 --- Diff: README.md --- @@ -0,0 +1,28 @@ + + +# Apache Flink Shaded Dependencies + +This repository contains a number of shaded dependencies for the [Apache Flink](https://flink.apache.org/) project. + +The purpose of these dependencies is to provide a single instance of a shaded dependency in the Flink distribution, instead of each individual module shading the dependency. + +The shaded dependencies contained here do not expose any transitive dependencies. They may or may not be self-contained. + +When using these dependencies it is recommended to work directly against the shaded namespaces. --- End diff -- Could you add the following ``` About Apache Flink is an open source project of The Apache Software Foundation (ASF). ``` and link to the ASF? Other than that +1 to merge > Add README to "flink-shaded.git" repository > --- > > Key: FLINK-7007 > URL: https://issues.apache.org/jira/browse/FLINK-7007 > Project: Flink > Issue Type: Sub-task > Components: flink-shaded.git >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > We should put a README file up there with a brief explanation of the purpose > of the repo + some links to the project. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction
[ https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li reassigned FLINK-7014: - Assignee: Ruidong Li > Expose isDeterministic interface to ScalarFunction and TableFunction > > > Key: FLINK-7014 > URL: https://issues.apache.org/jira/browse/FLINK-7014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > Currently, the `isDeterministic` method of implementations of `SqlFuntion` > are always returning true, which cause inappropriate optimization in Calcite, > such as taking user's stateful UDF as a pure functional procedure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7016) Move inputFormat to InputFormatVertex from TaskConfig
[ https://issues.apache.org/jira/browse/FLINK-7016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7016: -- Component/s: DataStream API DataSet API > Move inputFormat to InputFormatVertex from TaskConfig > - > > Key: FLINK-7016 > URL: https://issues.apache.org/jira/browse/FLINK-7016 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > On batch case, InputFormat is put into TaskConfig, batch task gets it to read > data and job manager uses it to split splits from TaskConfig. > On streaming case, all configs are put into StreamConfig, but this > inputFormat is put into TaskConfig. > We can put InputFormat into InputFormatVertex, and batch task still gets > InputFormat from TaskConfig. It will be clear. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7015) Separate OperatorConfig from StreamConfig
[ https://issues.apache.org/jira/browse/FLINK-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7015: -- Component/s: DataStream API > Separate OperatorConfig from StreamConfig > - > > Key: FLINK-7015 > URL: https://issues.apache.org/jira/browse/FLINK-7015 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > Now stream config contains configs not only the batch task needs, but also > the operator needs, so stream config can see configs of the operator, and > operator can see configs of the batch task. > We need to separate operator config from stream config, and they can only > see configs of themselves. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7018) Reconstruct streamgraph to clear interface
[ https://issues.apache.org/jira/browse/FLINK-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7018: -- Component/s: DataStream API > Reconstruct streamgraph to clear interface > --- > > Key: FLINK-7018 > URL: https://issues.apache.org/jira/browse/FLINK-7018 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > StreamGraph not only contains streamNodes and streamEdges, but also contains > virtual nodes who has nothing to do with the streamGraph. > Virtual nodes should be converted to streamNodes in > StreamGraphGenerator.class -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6866) ClosureCleaner.clean fails for scala's JavaConverters wrapper classes
[ https://issues.apache.org/jira/browse/FLINK-6866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065091#comment-16065091 ] SmedbergM commented on FLINK-6866: -- A Map[K,V] is already Serializable, as is java.util.Map -- it's just that before 2.12 scala.collection.convert.Wrappers.MapWrapper didn't inherit from the Serializable interface. I'm not sure there's a non-hacky (i.e. explicitly checking whether the classname begins with scala.collections.convert) way to get around this, because ObjectOutputStream checks whether each field inherits from Serializable. Unfortunately, the scala library writers didn't give the MapWrapper public access to its underlying wrapped Map. I don't think the details of the particular code that we encountered this in is all that relevant; in Scala, it's idiomatic to create immutable Maps and call `.asJava` on them when a library (like Flink) requires a java.util.Map. Of course, one can avoid having the wrapper by directly constructing the java.util.Map as in the MWE, but then one has all the mutability and concurrency worries that good idiomatic Scala lets me not think about. > ClosureCleaner.clean fails for scala's JavaConverters wrapper classes > - > > Key: FLINK-6866 > URL: https://issues.apache.org/jira/browse/FLINK-6866 > Project: Flink > Issue Type: Bug > Components: DataStream API, Scala API >Affects Versions: 1.2.0, 1.3.0 > Environment: Scala 2.10.6, Scala 2.11.11 > Does not appear using Scala 2.12 >Reporter: SmedbergM > > MWE: https://github.com/SmedbergM/ClosureCleanerBug > MWE console output: > https://gist.github.com/SmedbergM/ce969e6e8540da5b59c7dd921a496dc5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065086#comment-16065086 ] Aegeaner commented on FLINK-6895: - I will try this. > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aegeaner reassigned FLINK-6895: --- Assignee: Aegeaner > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates
[ https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065057#comment-16065057 ] ASF GitHub Bot commented on FLINK-6969: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4183#discussion_r124298519 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -37,6 +37,14 @@ class BatchQueryConfig private[table] extends QueryConfig class StreamQueryConfig private[table] extends QueryConfig { /** +* The deferredComputationTime is a strategy config of deferred computation that used to deal +* with late arriving data. For example, instead of computing a tumbling window of 1 hour at each +* full hour, we can add a deferred computation interval of 15 minute to compute the result +* quarter past each full hour. +*/ + private var deferredComputationTime: Long = 0L --- End diff -- Should we call this parameter rather `firstResultTimeOffset`? This would allow us to also use it to configure early results later. The value would be an offset from the original computation time. A positive value (> 0) would mean deferred computation (later than usual) and a negative value (<0) would mean an early computation / early result. > Add support for deferred computation for group window aggregates > > > Key: FLINK-6969 > URL: https://issues.apache.org/jira/browse/FLINK-6969 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: sunjincheng > > Deferred computation is a strategy to deal with late arriving data and avoid > updates of previous results. Instead of computing a result as soon as it is > possible (i.e., when a corresponding watermark was received), deferred > computation adds a configurable amount of slack time in which late data is > accepted before the result is compute. For example, instead of computing a > tumbling window of 1 hour at each full hour, we can add a deferred > computation interval of 15 minute to compute the result quarter past each > full hour. > This approach adds latency but can reduce the number of update esp. in use > cases where the user cannot influence the generation of watermarks. It is > also useful if the data is emitted to a system that cannot update result > (files or Kafka). The deferred computation interval should be configured via > the {{QueryConfig}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates
[ https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065056#comment-16065056 ] ASF GitHub Bot commented on FLINK-6969: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4183#discussion_r124319583 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -92,6 +100,27 @@ class StreamQueryConfig private[table] extends QueryConfig { this } + /** +* Specifies a deferred computation time for deferred computation, i.e., fires the --- End diff -- Specifies an offset for the point in time when the first result of a time-based computation is computed. For example, a tumbling window of one hour that ends at 13:00 would usually compute its first result at 13:00. With a firstResultTimeOffset of 15 minutes, the first result would be computed at 13:15. A positive firstResultTimeOffset parameter can be used to include late arriving records into the result of an event-time based computation. Negative offset values are not supported yet. Later, a negative offset will allow to compute early results, i.e., an offset of -45 minutes would compute the first and early result of the hourly tumbling window that ends at 13:00 at 12:15. > Add support for deferred computation for group window aggregates > > > Key: FLINK-6969 > URL: https://issues.apache.org/jira/browse/FLINK-6969 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: sunjincheng > > Deferred computation is a strategy to deal with late arriving data and avoid > updates of previous results. Instead of computing a result as soon as it is > possible (i.e., when a corresponding watermark was received), deferred > computation adds a configurable amount of slack time in which late data is > accepted before the result is compute. For example, instead of computing a > tumbling window of 1 hour at each full hour, we can add a deferred > computation interval of 15 minute to compute the result quarter past each > full hour. > This approach adds latency but can reduce the number of update esp. in use > cases where the user cannot influence the generation of watermarks. It is > also useful if the data is emitted to a system that cannot update result > (files or Kafka). The deferred computation interval should be configured via > the {{QueryConfig}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6969) Add support for deferred computation for group window aggregates
[ https://issues.apache.org/jira/browse/FLINK-6969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065058#comment-16065058 ] ASF GitHub Bot commented on FLINK-6969: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4183#discussion_r124298671 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -92,6 +100,27 @@ class StreamQueryConfig private[table] extends QueryConfig { this } + /** +* Specifies a deferred computation time for deferred computation, i.e., fires the +* [[org.apache.flink.streaming.api.windowing.windows.TimeWindow]] on the time which not +* smaller than (window.maxTimestamp + deferredComputationTime). For example, instead of +* computing a tumbling window of 1 hour at each full hour, we can add a deferred computation +* interval of 15 minute to compute the result quarter past each full hour. +*/ + def withDeferredComputationTime(deferredComputationTime: Time): StreamQueryConfig = { --- End diff -- rename to `withFirstResultTimeOffset`? > Add support for deferred computation for group window aggregates > > > Key: FLINK-6969 > URL: https://issues.apache.org/jira/browse/FLINK-6969 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: sunjincheng > > Deferred computation is a strategy to deal with late arriving data and avoid > updates of previous results. Instead of computing a result as soon as it is > possible (i.e., when a corresponding watermark was received), deferred > computation adds a configurable amount of slack time in which late data is > accepted before the result is compute. For example, instead of computing a > tumbling window of 1 hour at each full hour, we can add a deferred > computation interval of 15 minute to compute the result quarter past each > full hour. > This approach adds latency but can reduce the number of update esp. in use > cases where the user cannot influence the generation of watermarks. It is > also useful if the data is emitted to a system that cannot update result > (files or Kafka). The deferred computation interval should be configured via > the {{QueryConfig}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)