[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529428#comment-16529428 ] ASF GitHub Bot commented on FLINK-9377: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6235 [FLINK-9377] [core] Remove serializers from checkpointed state meta infos ## What is the purpose of the change This PR is the first step towards a smoother state evolution experience. It removes the behavior of writing serializers in checkpointed state meta infos (using Java serialization) and relying on them to be deserializable at restore time. Instead, the configuration snapshots of serializers now double as a factory for creating the restore serializer, solidifying it as the single source of truth of information about the previous serializer of state. With this change: - Checkpoints / savepoints move towards being Java serialization-free - The availability of the restore serializer, is basically determined at compile time - Potentially resolves caveats with macro-generated Scala serializers which typically have anonymous classnames which are easily susceptible to changes, which blocks successful savepoint restores due to how Java serialization works. - In conclusion: the written configuration snapshot is now the single point of entry for obtaining a serializer for previous state. The user is only required to guarantee that the configuration snapshot's classname remains constant for the restore to proceed. This PR is only a WIP which only includes extending the `TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` method, as well as the methods interplay in the state backends after removing serializers from checkpointed state meta infos. This PR does **NOT** include: - Proper implementation of the new `restoreSerializer` method on all serializers. - Tests for snapshotting, restoring, and migrating serializers and their interplay in the state backends. Because of this, it is expected that existing tests will fail. Follow-up PRs will be opened for the above mentioned missing parts. ## Brief change log - 5fc4a36 Add a `restoreSerializer` method to the `TypeSerializerConfigSnapshot` interface The method still has a dummy base implementation, because this PR doesn't yet properly implement the method for all serializers. Once that is accomplished, the base implementation should be removed. - 661eb6d Remove the "fallback" serializer option from `CompatibilityResult` That option was available in the past to allow users to have a safety path for state conversion, in case their previous serializer cannot be deserialized due to any reason blocked by Java serialization. Since now we use the config snapshot as the restore serializer factory, it is guaranteed that the restore serializer is always available in case conversion is required, and therefore voids the need for the "fallback" serializer option. - c91d045 Deprecates any utility methods that still have the behaviour of writing serializers in checkpoints - e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config snapshot which wraps an actual config snapshot, as well as a pre-existing serializer instance. In previous versions, since the config snapshot wasn't a serializer factory but simply a container for serializer parameters, previous serializers didn't necessarily have config snapshots that are capable of correctly creating a correct corresponding restore serializer. In this case, since previous serializers still have serializers written in the checkpoint, the backwards compatible solution would be to wrap the written serializer and the config snapshot within the BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the serializer, the wrapped serializer instance is returned instead of actually calling the restoreSerializer method of the wrapped config snapshot. - da84665 the actual removal of serializers from checkpointed state meta info ## Verifying this change This PR is a WIP preview, and tests is expected to fail due to reasons mentioned in the description. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and i
[jira] [Updated] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9377: -- Labels: pull-request-available (was: ) > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6235: [FLINK-9377] [core] Remove serializers from checkp...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6235 [FLINK-9377] [core] Remove serializers from checkpointed state meta infos ## What is the purpose of the change This PR is the first step towards a smoother state evolution experience. It removes the behavior of writing serializers in checkpointed state meta infos (using Java serialization) and relying on them to be deserializable at restore time. Instead, the configuration snapshots of serializers now double as a factory for creating the restore serializer, solidifying it as the single source of truth of information about the previous serializer of state. With this change: - Checkpoints / savepoints move towards being Java serialization-free - The availability of the restore serializer, is basically determined at compile time - Potentially resolves caveats with macro-generated Scala serializers which typically have anonymous classnames which are easily susceptible to changes, which blocks successful savepoint restores due to how Java serialization works. - In conclusion: the written configuration snapshot is now the single point of entry for obtaining a serializer for previous state. The user is only required to guarantee that the configuration snapshot's classname remains constant for the restore to proceed. This PR is only a WIP which only includes extending the `TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` method, as well as the methods interplay in the state backends after removing serializers from checkpointed state meta infos. This PR does **NOT** include: - Proper implementation of the new `restoreSerializer` method on all serializers. - Tests for snapshotting, restoring, and migrating serializers and their interplay in the state backends. Because of this, it is expected that existing tests will fail. Follow-up PRs will be opened for the above mentioned missing parts. ## Brief change log - 5fc4a36 Add a `restoreSerializer` method to the `TypeSerializerConfigSnapshot` interface The method still has a dummy base implementation, because this PR doesn't yet properly implement the method for all serializers. Once that is accomplished, the base implementation should be removed. - 661eb6d Remove the "fallback" serializer option from `CompatibilityResult` That option was available in the past to allow users to have a safety path for state conversion, in case their previous serializer cannot be deserialized due to any reason blocked by Java serialization. Since now we use the config snapshot as the restore serializer factory, it is guaranteed that the restore serializer is always available in case conversion is required, and therefore voids the need for the "fallback" serializer option. - c91d045 Deprecates any utility methods that still have the behaviour of writing serializers in checkpoints - e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config snapshot which wraps an actual config snapshot, as well as a pre-existing serializer instance. In previous versions, since the config snapshot wasn't a serializer factory but simply a container for serializer parameters, previous serializers didn't necessarily have config snapshots that are capable of correctly creating a correct corresponding restore serializer. In this case, since previous serializers still have serializers written in the checkpoint, the backwards compatible solution would be to wrap the written serializer and the config snapshot within the BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the serializer, the wrapped serializer instance is returned instead of actually calling the restoreSerializer method of the wrapped config snapshot. - da84665 the actual removal of serializers from checkpointed state meta info ## Verifying this change This PR is a WIP preview, and tests is expected to fail due to reasons mentioned in the description. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**)
[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete
[ https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei updated FLINK-9609: Description: Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, users want to do some extra work when a bucket is ready. It would be nice if we can support {{BucketReady}} mechanism for users or we can tell users when a bucket is ready for use. For example, One bucket is created for every 5 minutes, at the end of 5 minutes before creating the next bucket, the user might need to do something as the previous bucket ready, like sending the timestamp of the bucket ready time to a server or do some other stuff. Here, Bucket ready means all the part files suffix name under a bucket neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is ready for user use. Like a watermark means no elements with a timestamp older or equal to the watermark timestamp should arrive at the window. We can also refer to the concept of watermark here, or we can call this *BucketWatermark* if we could. Recently, I found a user who wants this functionality which I would think. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html Below is what he said: My user case is we read data from message queue, write to HDFS, and our ETL team will use the data in HDFS. *In the case, ETL need to know if all data is ready to be read accurately*, so we use a counter to count how many data has been wrote, if the counter is equal to the number we received, we think HDFS file is ready. We send the counter message in a custom sink so ETL can know how many data has been wrote, but if use current BucketingSink, even through HDFS file is flushed, ETL may still cannot read the data. If we can close file during checkpoint, then the result is accurately. And for the HDFS small file problem, it can be controller by use bigger checkpoint interval. was: Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, users want to do some extra work when a bucket is ready. It would be nice if we can support {{BucketReady}} mechanism for users or we can tell users when a bucket is ready for use. For example, One bucket is created for every 5 minutes, at the end of 5 minutes before creating the next bucket, the user might need to do something as the previous bucket ready, like sending the timestamp of the bucket ready time to a server or do some other stuff. Here, Bucket ready means all the part files suffix name under a bucket neither {{.pending}} nor {{.in-progress}}. Then we can think this bucket is ready for user use. Like a watermark means no elements with a timestamp older or equal to the watermark timestamp should arrive at the window. We can also refer to the concept of watermark here, or we can call this *BucketWatermark* if we could. Recently, I found a user who wants this functionality which I think. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html Below is what he said: My user case is we read data from message queue, write to HDFS, and our ETL team will use the data in HDFS. *In the case, ETL need to know if all data is ready to be read accurately*, so we use a counter to count how many data has been wrote, if the counter is equal to the number we received, we think HDFS file is ready. We send the counter message in a custom sink so ETL can know how many data has been wrote, but if use current BucketingSink, even through HDFS file is flushed, ETL may still cannot read the data. If we can close file during checkpoint, then the result is accurately. And for the HDFS small file problem, it can be controller by use bigger checkpoint interval. > Add bucket ready mechanism for BucketingSink when checkpoint complete > - > > Key: FLINK-9609 > URL: https://issues.apache.org/jira/browse/FLINK-9609 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.5.0, 1.4.2 >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > > Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, > users want to do some extra work when a bucket is ready. It would be nice if > we can support {{BucketReady}} mechanism for users or we can tell users when > a bucket is ready for use. For example, One bucket is created for every 5 > minutes, at the end of 5 minutes before creating the next bucket, the user > might need to do something as the previous bucket ready, like sending the > timestamp of the bucket ready time to a server or do some other stuff. > Here, Bucket r
[jira] [Commented] (FLINK-9698) "case class must be static and globally accessible" is too constrained
[ https://issues.apache.org/jira/browse/FLINK-9698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529406#comment-16529406 ] Jeff Zhang commented on FLINK-9698: --- Could anyone let me know why flink require case class must be static and globally accessible ? Because this similar code can work in spark, so I beleive it should be the same for flink. As both of them require to serialize these class to remote host and execute in remote side. > "case class must be static and globally accessible" is too constrained > -- > > Key: FLINK-9698 > URL: https://issues.apache.org/jira/browse/FLINK-9698 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Priority: Major > > The following code can reproduce this issue. > {code} > object BatchJob { > def main(args: Array[String]) { > // set up the batch execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tenv = TableEnvironment.getTableEnvironment(env) > case class Person(id:Int, name:String) > val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy")) > tenv.registerDataSet("table_1", ds); > } > } > {code} > Although I have workaround to declare case class outside of the main method, > this workaround won't work in scala-shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9698) "case class must be static and globally accessible" is too constrained
Jeff Zhang created FLINK-9698: - Summary: "case class must be static and globally accessible" is too constrained Key: FLINK-9698 URL: https://issues.apache.org/jira/browse/FLINK-9698 Project: Flink Issue Type: Improvement Reporter: Jeff Zhang The following code can reproduce this issue. {code} object BatchJob { def main(args: Array[String]) { // set up the batch execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tenv = TableEnvironment.getTableEnvironment(env) case class Person(id:Int, name:String) val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy")) tenv.registerDataSet("table_1", ds); } } {code} Although I have workaround to declare case class outside of the main method, this workaround won't work in scala-shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529329#comment-16529329 ] ASF GitHub Bot commented on FLINK-9337: --- Github user cricket007 commented on the issue: https://github.com/apache/flink/pull/5995 What about implementing a `KeyedDeserializationSchema` for Avro? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user cricket007 commented on the issue: https://github.com/apache/flink/pull/5995 What about implementing a `KeyedDeserializationSchema` for Avro? ---
[jira] [Closed] (FLINK-9670) Introduce slot manager factory
[ https://issues.apache.org/jira/browse/FLINK-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu closed FLINK-9670. - Resolution: Invalid > Introduce slot manager factory > -- > > Key: FLINK-9670 > URL: https://issues.apache.org/jira/browse/FLINK-9670 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529321#comment-16529321 ] ASF GitHub Bot commented on FLINK-9567: --- Github user Clark closed the pull request at: https://github.com/apache/flink/pull/6192 > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user Clark closed the pull request at: https://github.com/apache/flink/pull/6192 ---
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529282#comment-16529282 ] ASF GitHub Bot commented on FLINK-9688: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199366673 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Thank you for that catch. blank line removed > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199366673 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Thank you for that catch. blank line removed ---
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529278#comment-16529278 ] ASF GitHub Bot commented on FLINK-9688: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199365966 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Remove this blank line would be better~ > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9688: -- Labels: pull-request-available (was: ) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199365966 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Remove this blank line would be better~ ---
[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6234#discussion_r199365531 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) { /** * Prunes states assuming there will be no events with timestamp lower than the given one. -* It cleares the sharedBuffer and also emits all timed out partial matches. +* It clears the sharedBuffer and also emits all timed out partial matches. * * @param sharedBuffer the SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param timestamptimestamp that indicates that there will be no more events with lower timestamp * @return all timed outed partial matches * @throws Exception Thrown if the system cannot access the state. */ - public Collection>, Long>> advanceTime( + public Tuple2>, Long>>, Collection>>> advanceTime( final SharedBuffer sharedBuffer, final NFAState nfaState, - final long timestamp) throws Exception { + final long timestamp, + final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { --- End diff -- please add parameter explication for new parameter `afterMatchSkipStrategy ` of the method. ---
[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529276#comment-16529276 ] ASF GitHub Bot commented on FLINK-9431: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6234#discussion_r199365531 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) { /** * Prunes states assuming there will be no events with timestamp lower than the given one. -* It cleares the sharedBuffer and also emits all timed out partial matches. +* It clears the sharedBuffer and also emits all timed out partial matches. * * @param sharedBuffer the SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param timestamptimestamp that indicates that there will be no more events with lower timestamp * @return all timed outed partial matches * @throws Exception Thrown if the system cannot access the state. */ - public Collection>, Long>> advanceTime( + public Tuple2>, Long>>, Collection>>> advanceTime( final SharedBuffer sharedBuffer, final NFAState nfaState, - final long timestamp) throws Exception { + final long timestamp, + final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { --- End diff -- please add parameter explication for new parameter `afterMatchSkipStrategy ` of the method. > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529266#comment-16529266 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6233 Hello @hequn8128! Thank you for your review and comments. About PR template - I did changes based on proposed #5811. Please let me know if it is acceptable or not. About `rowToString` agree. I think it makes sense and I added such tests. However I faced with some strange behavior (I do not know if it is bug or whatever else). Commented on the code about that. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6233 Hello @hequn8128! Thank you for your review and comments. About PR template - I did changes based on proposed #5811. Please let me know if it is acceptable or not. About `rowToString` agree. I think it makes sense and I added such tests. However I faced with some strange behavior (I do not know if it is bug or whatever else). Commented on the code about that. ---
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529265#comment-16529265 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364581 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); + assertEquals(Arrays.toString(CliUtils.rowToString(result)), + "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 2018-11-12, " + + "[1, 2], (1,123,null)]"); --- End diff -- If having tuple here is ok then the next strange thing is null handling inside tuples (it is printed in lowercase and without brackets). So there are at least 2 different types of null handling: inside tuples and all others. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364538 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); --- End diff -- Is it a real case to have tuple here for SqlClient? API allows to do that but not sure about real cases. ---
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529264#comment-16529264 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364538 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); --- End diff -- Is it a real case to have tuple here for SqlClient? API allows to do that but not sure about real cases. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364581 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); + assertEquals(Arrays.toString(CliUtils.rowToString(result)), + "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 2018-11-12, " + + "[1, 2], (1,123,null)]"); --- End diff -- If having tuple here is ok then the next strange thing is null handling inside tuples (it is printed in lowercase and without brackets). So there are at least 2 different types of null handling: inside tuples and all others. ---
[jira] [Closed] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9456. Resolution: Fixed Fixed via 1.6.0: 89cfeaa882f9e68df2bd215563622b48c29a9ec9 50c0ea8c9fe17278d45aba476a95791152a1420b 1.5.1: a2f43b4cc081d360cd59ce3e7fb875e4b5fd243f 627412c4d2ea655271fe5da67a55ac936a1a060e > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529253#comment-16529253 ] ASF GitHub Bot commented on FLINK-9456: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 ---
[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529134#comment-16529134 ] ASF GitHub Bot commented on FLINK-9633: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @yanghua Thanks for the review, I rebased the PR. > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @yanghua Thanks for the review, I rebased the PR. ---
[jira] [Updated] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9431: -- Labels: pull-request-available (was: ) > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529124#comment-16529124 ] ASF GitHub Bot commented on FLINK-9431: --- GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6234 [FLINK-9431]Introduce time bounded condition to cep ## What is the purpose of the change In cep the event is now driving the transformation of the NFA, I think the time factor should also be taken into account in some senior. When a key's data is not endless, and if we want to match the following pattern after we match the `AB` after `B` has appeared for ten seconds. ``` Pattern.begin("A").followedBy("B").notFollowedBy("C") ``` We can not emit the result because there is no branch can lead to the `Final State`, And i think we can add a `TimeEnd` state to describe a pattern that accepts a time condition evaluated by processing time / event time to compare the timestamp in the element we have meant before. As described in the issue link, there are two main reason why i introduce this feature 1. the `notFollowedBy` cant be at the end of the pattern 2. the `within` just compare with the element at start, and some key's data may not endless, so we have to evaluate condition not also on event but also on time ## Brief change log 1. Add the method to distinguish the event driven condition or time drivern condition in `IterativeCondition` 2. when `advanceTime`, we not only prune the expire element, but also look the time bounded condition ## Verifying this change This change is already covered by existing cep tests, may be it need a little more about the new api. This change added tests and can be verified as follows: ## Documentation - Does this pull request introduce a new feature? (yes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink timeEnd-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6234 commit b1aa992a97c8eac818e57c3d2f82be76957052d0 Author: minwenjun Date: 2018-07-01T14:41:44Z [FLINK-9431]Introduce time bounded condition to cep > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6234 [FLINK-9431]Introduce time bounded condition to cep ## What is the purpose of the change In cep the event is now driving the transformation of the NFA, I think the time factor should also be taken into account in some senior. When a key's data is not endless, and if we want to match the following pattern after we match the `AB` after `B` has appeared for ten seconds. ``` Pattern.begin("A").followedBy("B").notFollowedBy("C") ``` We can not emit the result because there is no branch can lead to the `Final State`, And i think we can add a `TimeEnd` state to describe a pattern that accepts a time condition evaluated by processing time / event time to compare the timestamp in the element we have meant before. As described in the issue link, there are two main reason why i introduce this feature 1. the `notFollowedBy` cant be at the end of the pattern 2. the `within` just compare with the element at start, and some key's data may not endless, so we have to evaluate condition not also on event but also on time ## Brief change log 1. Add the method to distinguish the event driven condition or time drivern condition in `IterativeCondition` 2. when `advanceTime`, we not only prune the expire element, but also look the time bounded condition ## Verifying this change This change is already covered by existing cep tests, may be it need a little more about the new api. This change added tests and can be verified as follows: ## Documentation - Does this pull request introduce a new feature? (yes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink timeEnd-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6234 commit b1aa992a97c8eac818e57c3d2f82be76957052d0 Author: minwenjun Date: 2018-07-01T14:41:44Z [FLINK-9431]Introduce time bounded condition to cep ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6194 +1, there is a conflicting file~ cc @sihuazhou ---
[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529123#comment-16529123 ] ASF GitHub Bot commented on FLINK-9633: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6194 +1, there is a conflicting file~ cc @sihuazhou > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199352237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for this approach that directly specifies the interval literals. Regarding Quarter. It seems like a very old implementation and we should probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it consistent with all other time unit extractions. What do you guys think? I just tried it out by modifying the `Extract` method and it seems working perfectly. ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529122#comment-16529122 ] ASF GitHub Bot commented on FLINK-6846: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199352237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for this approach that directly specifies the interval literals. Regarding Quarter. It seems like a very old implementation and we should probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it consistent with all other time unit extractions. What do you guys think? I just tried it out by modifying the `Extract` method and it seems working perfectly. > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances
[ https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529121#comment-16529121 ] ASF GitHub Bot commented on FLINK-9654: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6206 hi @zsolt-donca I have seen the Travis build error log, the failed reason is not because of your code. This PR looks good, but if you can add a test for `isAnonymousClass` method, that would be better. cc @tillrohrmann > Internal error while deserializing custom Scala TypeSerializer instances > > > Key: FLINK-9654 > URL: https://issues.apache.org/jira/browse/FLINK-9654 > Project: Flink > Issue Type: Bug >Reporter: Zsolt Donca >Priority: Major > Labels: pull-request-available > > When you are using custom `TypeSerializer` instances implemented in Scala, > the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can > manifest itself when a Flink job is restored from checkpoint or started with > a savepoint. > The reason is that in such a restore from checkpoint or savepoint, Flink uses > `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type > serializers and their configurations. The deserialization walks through the > entire object graph corresponding, and for each class it calls > `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place > for FLINK-6869). If there is an internal class defined in a Scala object for > which `getSimpleName` fails (see the Scala issue), then a > `java.lang.InternalError` is thrown which causes the task manager to restart. > In this case, Flink tries to restart the job on another task manager, causing > all the task managers to restart, wreaking havoc on the entire Flink cluster. > There are some alternative type information derivation mechanisms that rely > on anonymous classes and, most importantly, classes generated by macros, that > can easily trigger the above problem. I am personally working on > [https://github.com/zsolt-donca/flink-alt], and there is also > [https://github.com/joroKr21/flink-shapeless] > I prepared a pull request that fixes the issue. > > Edit: added a stack trace to help demonstrate the issue. > 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR > org.apache.flink.runtime.taskmanager.Task - Encountered fatal error > java.lang.InternalError - terminating the JVM > java.lang.InternalError: Malformed class name > at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171] > at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171] > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > ~[na:1.8.0_171] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] >
[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6206 hi @zsolt-donca I have seen the Travis build error log, the failed reason is not because of your code. This PR looks good, but if you can add a test for `isAnonymousClass` method, that would be better. cc @tillrohrmann ---
[jira] [Updated] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images
[ https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonid Ishimnikov updated FLINK-9695: - Description: It would be useful to have an option to forcefully pull Docker images for tasks, rather than reuse a previously cached image. Such option exists in many Mesos frameworks, and it significantly simplifies debugging. I propose adding a new {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. (was: It would be useful to have an option to forcefully pull Docker images for tasks, rather than reuse a previously cached image. Such option exists in many Mesos frameworks, and it significantly simplifies debugging. I propose adding a new {{mesos.resourcemanager.tasks.container.docker.forcePullImage}} option.) > Add option for Mesos executor to forcefully pull Docker images > -- > > Key: FLINK-9695 > URL: https://issues.apache.org/jira/browse/FLINK-9695 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Leonid Ishimnikov >Assignee: Leonid Ishimnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be useful to have an option to forcefully pull Docker images for > tasks, rather than reuse a previously cached image. Such option exists in > many Mesos frameworks, and it significantly simplifies debugging. I propose > adding a new > {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529103#comment-16529103 ] ASF GitHub Bot commented on FLINK-9669: --- Github user liurenjie1024 closed the pull request at: https://github.com/apache/flink/pull/6214 > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529102#comment-16529102 ] ASF GitHub Bot commented on FLINK-9669: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6214 @tillrohrmann This is from my initial design, and since the design has changed, we can close this now. > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu closed FLINK-9669. - Resolution: Invalid > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6214 @tillrohrmann This is from my initial design, and since the design has changed, we can close this now. ---
[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 closed the pull request at: https://github.com/apache/flink/pull/6214 ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199349619 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for better consistency. It is good to follow the Table-api style. ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529099#comment-16529099 ] ASF GitHub Bot commented on FLINK-6846: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199349619 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for better consistency. It is good to follow the Table-api style. > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529096#comment-16529096 ] ASF GitHub Bot commented on FLINK-9696: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6233 Hi @snuyanzin , thanks for your PR. The code looks good and the `deepToString()` function returns result correctly. I could not spot any issues with the implementation. To make the PR better, I think we can add a test in `CliUtilsTest` to test the `rowToString` function, since code in the function also has been changed. BTW, the PR template can be done better. See for PR https://github.com/apache/flink/pull/5811 as an example. Best, Hequn > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6233 Hi @snuyanzin , thanks for your PR. The code looks good and the `deepToString()` function returns result correctly. I could not spot any issues with the implementation. To make the PR better, I think we can add a test in `CliUtilsTest` to test the `rowToString` function, since code in the function also has been changed. BTW, the PR template can be done better. See for PR https://github.com/apache/flink/pull/5811 as an example. Best, Hequn ---
[GitHub] flink issue #6219: [hotfix] Fixed typo in docs
Github user elbaulp commented on the issue: https://github.com/apache/flink/pull/6219 @tillrohrmann You're welcome :-) ---
[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and display it in the UI
[ https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529025#comment-16529025 ] vinoyang commented on FLINK-9682: - seems a feasible suggestion, what do you think about this? [~till.rohrmann] and [~StephanEwen] > Add setDescription to execution environment and display it in the UI > > > Key: FLINK-9682 > URL: https://issues.apache.org/jira/browse/FLINK-9682 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently you can provide a job name to {{execute}} in the execution > environment. In an environment where many version of a job may be executing, > such as a development or test environment, identifying which running job is > of a specific version via the UI can be difficult unless the version is > embedded into the job name given the {{execute}}. But the job name is uses > for other purposes, such as for namespacing metrics. Thus, it is not ideal > to modify the job name, as that could require modifying metric dashboards and > monitors each time versions change. > I propose a new method be added to the execution environment, > {{setDescription}}, that would allow a user to pass in an arbitrary > description that would be displayed in the dashboard, allowing users to > distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9511) Make StateDescriptor configurable with optional TTL
[ https://issues.apache.org/jira/browse/FLINK-9511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529023#comment-16529023 ] vinoyang commented on FLINK-9511: - Hi [~azagrebin] the defined TtlConfig stored in flink-runtime module but StateDescriptor class exists in flink-core, it seems I can not import it. Is there something wrong? > Make StateDescriptor configurable with optional TTL > --- > > Key: FLINK-9511 > URL: https://issues.apache.org/jira/browse/FLINK-9511 > Project: Flink > Issue Type: Sub-task > Components: Java API, State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > TTL can be enabled and configured in the constructor of abstract > StateDescriptor and become available in all subclasses: > | {code:java} > enum StateTtlUpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite } > enum StateTtlCleanupGuarantee { Relaxed, Exact } > enum TtlStateVisibility { Relaxed, Exact } > class TtlConfig { > StateTtlUpdateType updateType; > StateTtlCleanupGuarantee cleanupStrategy; > TtlStateVisibility stateVisibility; > TimeCharacteristic timeCharacteristic; > long ttl; > } > // previous constructor > StateDescriptor(...) { > this.ttlConfig = ttlConfig.DISABLED; > } > // overloaded constructor with TtlConfig > StateDescriptor(..., ttlConfig) { > ... > } > {code} > | > Another option is to consider adding StateDescriptor builder > Queryable state can be unsupported with TTL for now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529020#comment-16529020 ] vinoyang commented on FLINK-9221: - [~joshlemer] I think we should not add this method to `SinkFunction` interface, it seems could be implemented in a util class. What's your opinion? [~till.rohrmann] > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataSet API, DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Assignee: vinoyang >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)