[GitHub] Clarkkkkk opened a new pull request #6635: [hotfix][doc][sql-client] Fix the example configuration yaml file by …
Clark opened a new pull request #6635: [hotfix][doc][sql-client] Fix the example configuration yaml file by … URL: https://github.com/apache/flink/pull/6635 ## What is the purpose of the change Fix a problem in sql client doc. ## Brief change log - Add a double quote to the kafka version in the yaml configuration, otherwise will lead to a org.apache.flink.table.api.NoMatchingTableFactoryException if using the yaml example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10208) Bump mockito to 2.0+
[ https://issues.apache.org/jira/browse/FLINK-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597075#comment-16597075 ] ASF GitHub Bot commented on FLINK-10208: zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634#discussion_r213907317 ## File path: pom.xml ## @@ -171,7 +171,7 @@ under the License. org.mockito - mockito-all Review comment: The mockito-all artifact is no longer distributed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bump mockito to 2.0+ > > > Key: FLINK-10208 > URL: https://issues.apache.org/jira/browse/FLINK-10208 > Project: Flink > Issue Type: Sub-task > Components: Build System, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Mockito only properly supports java 9 with version 2. We have to bump the > dependency and fix various API incompatibilities. > Additionally we could investigate whether we still need powermock after > bumping the dependency (which we'd also have to bump otherwise). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5
zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634#discussion_r213907317 ## File path: pom.xml ## @@ -171,7 +171,7 @@ under the License. org.mockito - mockito-all Review comment: The mockito-all artifact is no longer distributed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10208) Bump mockito to 2.0+
[ https://issues.apache.org/jira/browse/FLINK-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597074#comment-16597074 ] ASF GitHub Bot commented on FLINK-10208: zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634#discussion_r213907252 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java ## @@ -531,7 +532,7 @@ public void testMultiChannelSkippingCheckpoints() throws Exception { // checkpoint 3 aborted (end of partition) check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); - verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(InputEndOfStreamException.class)); Review comment: @NicoK @zhijiangW I'd love to get your input here. As far as i can tell the existing code was simple wrong, as the supposed exception wasn't supposed to be thrown in this situation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bump mockito to 2.0+ > > > Key: FLINK-10208 > URL: https://issues.apache.org/jira/browse/FLINK-10208 > Project: Flink > Issue Type: Sub-task > Components: Build System, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Mockito only properly supports java 9 with version 2. We have to bump the > dependency and fix various API incompatibilities. > Additionally we could investigate whether we still need powermock after > bumping the dependency (which we'd also have to bump otherwise). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5
zentol commented on a change in pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634#discussion_r213907252 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java ## @@ -531,7 +532,7 @@ public void testMultiChannelSkippingCheckpoints() throws Exception { // checkpoint 3 aborted (end of partition) check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); - verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(InputEndOfStreamException.class)); Review comment: @NicoK @zhijiangW I'd love to get your input here. As far as i can tell the existing code was simple wrong, as the supposed exception wasn't supposed to be thrown in this situation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10208) Bump mockito to 2.0+
[ https://issues.apache.org/jira/browse/FLINK-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597073#comment-16597073 ] ASF GitHub Bot commented on FLINK-10208: zentol opened a new pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634 ## What is the purpose of the change This PR bumps mockito to 2.21.0 and powermock to 2.0.0-beta.5 for jdk9 compatibility. This is a substantial version, as our current mockito version is 10 years old. As a result a fair number of incompatibilities had to be resolved. I grouped instances of the same issue into single commits, and added a note to each commit as to what has changed. Some of these are rather mechanical (like package changes), while others are pretty subtle (like the exception matching). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bump mockito to 2.0+ > > > Key: FLINK-10208 > URL: https://issues.apache.org/jira/browse/FLINK-10208 > Project: Flink > Issue Type: Sub-task > Components: Build System, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Mockito only properly supports java 9 with version 2. We have to bump the > dependency and fix various API incompatibilities. > Additionally we could investigate whether we still need powermock after > bumping the dependency (which we'd also have to bump otherwise). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10208) Bump mockito to 2.0+
[ https://issues.apache.org/jira/browse/FLINK-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10208: --- Labels: pull-request-available (was: ) > Bump mockito to 2.0+ > > > Key: FLINK-10208 > URL: https://issues.apache.org/jira/browse/FLINK-10208 > Project: Flink > Issue Type: Sub-task > Components: Build System, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Mockito only properly supports java 9 with version 2. We have to bump the > dependency and fix various API incompatibilities. > Additionally we could investigate whether we still need powermock after > bumping the dependency (which we'd also have to bump otherwise). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol opened a new pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5
zentol opened a new pull request #6634: [FLINK-10208][build] Bump mockito to 2.21.0 / powermock to 2.0.0-beta.5 URL: https://github.com/apache/flink/pull/6634 ## What is the purpose of the change This PR bumps mockito to 2.21.0 and powermock to 2.0.0-beta.5 for jdk9 compatibility. This is a substantial version, as our current mockito version is 10 years old. As a result a fair number of incompatibilities had to be resolved. I grouped instances of the same issue into single commits, and added a note to each commit as to what has changed. Some of these are rather mechanical (like package changes), while others are pretty subtle (like the exception matching). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10240) Flexible scheduling strategy is needed for batch job
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10240: Description: Currently in Flink we have 2 schedule mode: 1. EAGER mode starts all tasks at once, mainly for streaming job 2. LAZY_FROM_SOURCES mode starts a task once its input data is consumable However, in batch job, input data ready does not always mean the task can work at once. One example is the hash join operation, where the operator first consumes one side(we call it build side) to setup a table, then consumes the other side(we call it probe side) to do the real join work. If the probe side is started early, it just get stuck on back pressure as the join operator will not consume data from it before the building stage is done, causing a waste of resources. If we have the probe side task started after the build stage is done, both the build and probe side can have more computing resources as they are staggered. That's why we think a flexible scheduling strategy is needed, allowing job owners to customize the vertex schedule order and constraints. Better resource utilization usually means better performance. was: Currently in Flink we have 2 schedule mode: 1. EAGER mode starts all tasks at once, mainly for streaming job 2. LAZY_FROM_SOURCES mode starts a task once its input data is consumable However, in batch job, input data ready does not always mean the task can work at once. One example is the hash join operation, where the operator first consumes one side(we call it build side) to setup a table, then consumes the other side(we call it probe side) to do the real join work. If the probe side is started early, it just get stuck on back pressure as the join operator will not consume data from it before the building stage is done, causing a waste of resources. If we have the probe side task start after the build stage is done, both the build and probe side can have more computing resources as they are staggered. That's way we think a flexible scheduling strategy is needed, allowing job owners to customize the vertex schedule order and constraints. Better resource utilization usually means better performance. > Flexible scheduling strategy is needed for batch job > > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently in Flink we have 2 schedule mode: > 1. EAGER mode starts all tasks at once, mainly for streaming job > 2. LAZY_FROM_SOURCES mode starts a task once its input data is consumable > > However, in batch job, input data ready does not always mean the task can > work at once. > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10170) Support string representation for map and array types in descriptor-based Table API
[ https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597039#comment-16597039 ] ASF GitHub Bot commented on FLINK-10170: buptljy commented on issue #6578: [FLINK-10170] [table] Support string representation for map and array types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-417182960 lgtm. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support string representation for map and array types in descriptor-based > Table API > --- > > Key: FLINK-10170 > URL: https://issues.apache.org/jira/browse/FLINK-10170 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: flink-10170 > > > Since 1.6 the recommended way of creating source/sink table is using > connector/format/schema/ descriptors. However, when declaring map types in > the schema descriptor, the following exception would be thrown: > {quote}org.apache.flink.table.api.TableException: A string representation for > map types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] buptljy commented on issue #6578: [FLINK-10170] [table] Support string representation for map and array types in descriptor-based Table API
buptljy commented on issue #6578: [FLINK-10170] [table] Support string representation for map and array types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-417182960 lgtm. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10170) Support string representation for map and array types in descriptor-based Table API
[ https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Zhang updated FLINK-10170: -- Summary: Support string representation for map and array types in descriptor-based Table API (was: Support string representation for map types in descriptor-based Table API) > Support string representation for map and array types in descriptor-based > Table API > --- > > Key: FLINK-10170 > URL: https://issues.apache.org/jira/browse/FLINK-10170 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: flink-10170 > > > Since 1.6 the recommended way of creating source/sink table is using > connector/format/schema/ descriptors. However, when declaring map types in > the schema descriptor, the following exception would be thrown: > {quote}org.apache.flink.table.api.TableException: A string representation for > map types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596985#comment-16596985 ] ASF GitHub Bot commented on FLINK-7964: --- yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417164121 @eliaslevy Piotr's [comment](https://github.com/apache/flink/pull/6577#issuecomment-414964833) should answer your question. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417164121 @eliaslevy Piotr's [comment](https://github.com/apache/flink/pull/6577#issuecomment-414964833) should answer your question. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-9043) Introduce a friendly way to resume the job from externalized checkpoints automatically
[ https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596318#comment-16596318 ] godfrey johnson edited comment on FLINK-9043 at 8/30/18 1:54 AM: - [https://github.com/GodfreyJohnson/flink/commit/3338b4c1974c018410f438cfc6f12968c8df5fa6] [https://github.com/apache/flink/pull/6633] I added a patch for restoring from the latest job's completed checkpoint with hdfs path. What we aim to do is to recover from the hdfs path automatically with the latest job's completed checkpoint. Currently, we can use 'run -s' with the metadata path manully, which is easy for single flink job to recover. But we have managed a lot of flink jobs, we want each flink job recovered just like spark streaming with getorcreate method from the latest completed jobs, without records lost. For this patch, - Each flink job should have it own hdfs checkpoint path - Only support for HDFS(hdfs:// or viewfs://) - Support for RocksDBStateBackend and FsStateBackend - Support for legacy mode and new mode(dynamic scaling) was (Author: godfrey): [https://github.com/GodfreyJohnson/flink/commit/12b7d5332610273d1ac79ee4593c31994c69fee9] [https://github.com/apache/flink/pull/6630] I added a patch for restoring from the latest job's completed checkpoint with hdfs path. What we aim to do is to recover from the hdfs path automatically with the latest job's completed checkpoint. Currently, we can use 'run -s' with the metadata path manully, which is easy for single flink job to recover. But we have managed a lot of flink jobs, we want each flink job recovered just like spark streaming with getorcreate method from the latest completed jobs, without records lost. For this patch, - Each flink job should have it own hdfs checkpoint path - Only support for HDFS(hdfs:// or viewfs://) - Support for RocksDBStateBackend and FsStateBackend - Support for legacy mode and new mode(dynamic scaling) > Introduce a friendly way to resume the job from externalized checkpoints > automatically > -- > > Key: FLINK-9043 > URL: https://issues.apache.org/jira/browse/FLINK-9043 > Project: Flink > Issue Type: New Feature >Reporter: godfrey johnson >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > I know a flink job can reovery from checkpoint with restart strategy, but can > not recovery as spark streaming jobs when job is starting. > Every time, the submitted flink job is regarded as a new job, while , in the > spark streaming job, which can detect the checkpoint directory first, and > then recovery from the latest succeed one. However, Flink only can recovery > until the job failed first, then retry with strategy. > > So, would flink support to recover from the checkpoint directly in a new job? > h2. New description by [~sihuazhou] > Currently, it's quite a bit not friendly for users to recover job from the > externalized checkpoint, user need to find the dedicate dir for the job which > is not a easy thing when there are too many jobs. This ticket attend to > introduce a more friendly way to allow the user to use the externalized > checkpoint to do recovery. > The implementation steps are copied from the comments of [~StephanEwen]: > - We could make this an option where you pass a flag (-r) to automatically > look for the latest checkpoint in a given directory. > - If more than one jobs checkpointed there before, this operation would fail. > - We might also need a way to have jobs not create the UUID subdirectory, > otherwise the scanning for the latest checkpoint would not easily work. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9043) Introduce a friendly way to resume the job from externalized checkpoints automatically
[ https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596983#comment-16596983 ] ASF GitHub Bot commented on FLINK-9043: --- GodfreyJohnson opened a new pull request #6633: [FLINK-9043] restore from the latest job's completed checkpoint for h… URL: https://github.com/apache/flink/pull/6633 - For [FLINK-9043](https://issues.apache.org/jira/browse/FLINK-9043?filter=-6=project%20%3D%20FLINK%20AND%20created%20%3E%3D%20-1w%20order%20by%20created%20DESC) ## What is the purpose of the change What we aim to do is to recover from the hdfs path automatically with the latest job's completed checkpoint. Currently, we can use 'run -s' with the metadata path manully, which is easy for single flink job to recover. But we have managed a lot of flink jobs, we want each flink job recovered just like spark streaming with getorcreate method from the latest completed jobs, without records lost. - Each flink job has it own hdfs checkpoint path - Only support for HDFS(hdfs:// or viewfs://) - Support for RocksDBStateBackend and FsStateBackend - Support for legacy mode and new mode(dynamic scaling) ## Brief change log - add hdfs utils to get the latest job completed checkpoint metadata path - recover from the metadata path for legacy mode and new mode ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Introduce a friendly way to resume the job from externalized checkpoints > automatically > -- > > Key: FLINK-9043 > URL: https://issues.apache.org/jira/browse/FLINK-9043 > Project: Flink > Issue Type: New Feature >Reporter: godfrey johnson >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > I know a flink job can reovery from checkpoint with restart strategy, but can > not recovery as spark streaming jobs when job is starting. > Every time, the submitted flink job is regarded as a new job, while , in the > spark streaming job, which can detect the checkpoint directory first, and > then recovery from the latest succeed one. However, Flink only can recovery > until the job failed first, then retry with strategy. > > So, would flink support to recover from the checkpoint directly in a new job? > h2. New description by [~sihuazhou] > Currently, it's quite a bit not friendly for users to recover job from the > externalized checkpoint, user need to find the dedicate dir for the job which > is not a easy thing when there are too many jobs. This ticket attend to > introduce a more friendly way to allow the user to use the externalized > checkpoint to do recovery. > The implementation steps are copied from the comments of [~StephanEwen]: > - We could make this an option where you pass a flag (-r) to automatically > look for the latest checkpoint in a given directory. > - If more than one jobs checkpointed there before, this operation would fail. > - We might also need a way to have jobs not create the UUID subdirectory, > otherwise the scanning for the latest checkpoint would not easily work. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GodfreyJohnson opened a new pull request #6633: [FLINK-9043] restore from the latest job's completed checkpoint for h…
GodfreyJohnson opened a new pull request #6633: [FLINK-9043] restore from the latest job's completed checkpoint for h… URL: https://github.com/apache/flink/pull/6633 - For [FLINK-9043](https://issues.apache.org/jira/browse/FLINK-9043?filter=-6=project%20%3D%20FLINK%20AND%20created%20%3E%3D%20-1w%20order%20by%20created%20DESC) ## What is the purpose of the change What we aim to do is to recover from the hdfs path automatically with the latest job's completed checkpoint. Currently, we can use 'run -s' with the metadata path manully, which is easy for single flink job to recover. But we have managed a lot of flink jobs, we want each flink job recovered just like spark streaming with getorcreate method from the latest completed jobs, without records lost. - Each flink job has it own hdfs checkpoint path - Only support for HDFS(hdfs:// or viewfs://) - Support for RocksDBStateBackend and FsStateBackend - Support for legacy mode and new mode(dynamic scaling) ## Brief change log - add hdfs utils to get the latest job completed checkpoint metadata path - recover from the metadata path for legacy mode and new mode ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9043) Introduce a friendly way to resume the job from externalized checkpoints automatically
[ https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596981#comment-16596981 ] ASF GitHub Bot commented on FLINK-9043: --- GodfreyJohnson closed pull request #6630: [FLINK-9043] restore from the latest job's completed checkpoint with … URL: https://github.com/apache/flink/pull/6630 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e936b246222..165377e269e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.runtime.util.HDFSUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -180,6 +181,9 @@ /** Registry that tracks state which is shared across (incremental) checkpoints */ private SharedStateRegistry sharedStateRegistry; + /** StateBackend to get the external checkpoint base dir */ + private StateBackend stateBackend; + // public CheckpointCoordinator( @@ -228,6 +232,7 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.executor = checkNotNull(executor); + this.stateBackend = checkpointStateBackend; this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); @@ -1076,6 +1081,157 @@ public boolean restoreLatestCheckpointedState( } } + /** +* Restores the latest checkpointed state. +* +* @param tasks Map of job vertices to restore. State for these vertices is +* restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. +* @param errorIfNoCheckpoint Fail if no completed checkpoint is available to +* restore from. +* @param allowNonRestoredState Allow checkpoint state that cannot be mapped +* to any job vertex in tasks. +* @param userCodeLoader classloader +* @return true if state was restored, false otherwise. +* @throws IllegalStateException If the CheckpointCoordinator is shut down. +* @throws IllegalStateException If no completed checkpoint is available and +* the failIfNoCheckpoint flag has been set. +* @throws IllegalStateException If the checkpoint contains state that cannot be +* mapped to any job vertex in tasks and the +* allowNonRestoredState flag has not been set. +* @throws IllegalStateException If the max parallelism changed for an operator +* that restores state from this checkpoint. +* @throws IllegalStateException If the parallelism changed for an operator +* that restores non-partitioned state from this +* checkpoint. +*/ + public boolean restoreLatestCheckpointedState( + Map tasks, + boolean errorIfNoCheckpoint, + boolean allowNonRestoredState, + ClassLoader userCodeLoader) throws Exception { + + synchronized (lock) { + if (shutdown) { + throw new IllegalStateException("CheckpointCoordinator is shut down"); + } + + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only
[GitHub] GodfreyJohnson closed pull request #6630: [FLINK-9043] restore from the latest job's completed checkpoint with …
GodfreyJohnson closed pull request #6630: [FLINK-9043] restore from the latest job's completed checkpoint with … URL: https://github.com/apache/flink/pull/6630 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e936b246222..165377e269e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.runtime.util.HDFSUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -180,6 +181,9 @@ /** Registry that tracks state which is shared across (incremental) checkpoints */ private SharedStateRegistry sharedStateRegistry; + /** StateBackend to get the external checkpoint base dir */ + private StateBackend stateBackend; + // public CheckpointCoordinator( @@ -228,6 +232,7 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.executor = checkNotNull(executor); + this.stateBackend = checkpointStateBackend; this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); @@ -1076,6 +1081,157 @@ public boolean restoreLatestCheckpointedState( } } + /** +* Restores the latest checkpointed state. +* +* @param tasks Map of job vertices to restore. State for these vertices is +* restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. +* @param errorIfNoCheckpoint Fail if no completed checkpoint is available to +* restore from. +* @param allowNonRestoredState Allow checkpoint state that cannot be mapped +* to any job vertex in tasks. +* @param userCodeLoader classloader +* @return true if state was restored, false otherwise. +* @throws IllegalStateException If the CheckpointCoordinator is shut down. +* @throws IllegalStateException If no completed checkpoint is available and +* the failIfNoCheckpoint flag has been set. +* @throws IllegalStateException If the checkpoint contains state that cannot be +* mapped to any job vertex in tasks and the +* allowNonRestoredState flag has not been set. +* @throws IllegalStateException If the max parallelism changed for an operator +* that restores state from this checkpoint. +* @throws IllegalStateException If the parallelism changed for an operator +* that restores non-partitioned state from this +* checkpoint. +*/ + public boolean restoreLatestCheckpointedState( + Map tasks, + boolean errorIfNoCheckpoint, + boolean allowNonRestoredState, + ClassLoader userCodeLoader) throws Exception { + + synchronized (lock) { + if (shutdown) { + throw new IllegalStateException("CheckpointCoordinator is shut down"); + } + + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery + completedCheckpointStore.recover(); + + // Now, we re-register all (shared) states from the checkpoint store with the new registry +
[jira] [Commented] (FLINK-10229) Support listing of views
[ https://issues.apache.org/jira/browse/FLINK-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596975#comment-16596975 ] ASF GitHub Bot commented on FLINK-10229: yanghua commented on issue #6631: [FLINK-10229][SQL CLIENT] Support listing of views URL: https://github.com/apache/flink/pull/6631#issuecomment-417160966 cc @fhueske and @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support listing of views > > > Key: FLINK-10229 > URL: https://issues.apache.org/jira/browse/FLINK-10229 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Reporter: Timo Walther >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > FLINK-10163 added initial support of views for the SQL Client. According to > other database vendors, views are listed in the \{{SHOW TABLES}}. However, > there should be a way of listing only the views. We can support the \{{SHOW > VIEWS}} command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6631: [FLINK-10229][SQL CLIENT] Support listing of views
yanghua commented on issue #6631: [FLINK-10229][SQL CLIENT] Support listing of views URL: https://github.com/apache/flink/pull/6631#issuecomment-417160966 cc @fhueske and @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596917#comment-16596917 ] Rong Rong commented on FLINK-9172: -- [~eronwright] as far as I know, similar to table source and sink. User should be able to define catalogs either as default environment. Or defined in extended user sessions. And yes catalog factories require specific information in order to establish connections. Originally when I created the ticket we were trying to add both HCatalog and one of our in-house catalog when using Flink, thus we propose to have a general API to create ExternalCatalog based on different configurations. as [~twalthr] mentioned, catalog support has already been added in, on the flink-table side, we just need to add a {{TableCatalogFactory}} that extends {{TableFactory}} and returns an instance of {{ExternalCatalog}}. On the SQL-client side, what we had in mind was similar to how {{TableFactoryService}} is used to discover {{TableFactory}}, user can register a similar {{TableCatalogFactoryService}} that is used to discover {{TableCatalogFactory}}. and SQL-client will use this configuration information to locate the correct {{TableCatalogFactory}} and ultimately instantiate the external catalog instance. I do agree we need a design doc for at least how the configuration should look like. I will try come up with a POC and a short configuration mock up. What do you guys think? > Support external catalog factory that comes default with SQL-Client > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596857#comment-16596857 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213833987 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Sounds like a good idea to me. Good catch @zentol! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213833987 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Sounds like a good idea to me. Good catch @zentol! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10207) Bump checkstyle-plugin to 8.9
[ https://issues.apache.org/jira/browse/FLINK-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10207. Resolution: Fixed master: ebd7a04836cf53e2527c321a6ee40bf9a07c63cf 9068fbe4f38b8866148986fa48e14d8d2e7e7100 > Bump checkstyle-plugin to 8.9 > - > > Key: FLINK-10207 > URL: https://issues.apache.org/jira/browse/FLINK-10207 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Our current checkstyle version (8.4) is incompatible with java 9, the > earliest version to work properly is 8.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10207) Bump checkstyle-plugin to 8.9
[ https://issues.apache.org/jira/browse/FLINK-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596804#comment-16596804 ] ASF GitHub Bot commented on FLINK-10207: zentol closed pull request #6618: [FLINK-10207][build] Bump checkstyle to 8.9 URL: https://github.com/apache/flink/pull/6618 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/flinkDev/ide_setup.md b/docs/flinkDev/ide_setup.md index 66cd552a0b5..87e7ceb5e99 100644 --- a/docs/flinkDev/ide_setup.md +++ b/docs/flinkDev/ide_setup.md @@ -87,7 +87,7 @@ IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin. 1. Install the "Checkstyle-IDEA" plugin from the IntelliJ plugin repository. 2. Configure the plugin by going to Settings -> Other Settings -> Checkstyle. 3. Set the "Scan Scope" to "Only Java sources (including tests)". -4. Select _8.4_ in the "Checkstyle Version" dropdown and click apply. **This step is important, +4. Select _8.9_ in the "Checkstyle Version" dropdown and click apply. **This step is important, don't skip it!** 5. In the "Configuration File" pane, add a new configuration using the plus icon: 1. Set the "Description" to "Flink". diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index ec9cfc548c1..a83f29cb6b5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -60,6 +60,9 @@ import java.util.Collections; import java.util.UUID; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -67,9 +70,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; - /** * Simple and maybe stupid test to check the {@link ClusterClient} class. */ diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java index 4f77f954d3e..3d6c5837e97 100644 --- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java @@ -29,15 +29,15 @@ import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.tests.artificialstate.ComplexPayload; +import java.util.Collections; +import java.util.List; + import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; -import java.util.Collections; -import java.util.List; - /** * Test upgrade of generic stateful job for Flink's DataStream API operators and primitives. * diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java index cd371216586..784a50eeb70 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java @@ -26,12 +26,12 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.fail; - import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; +import static org.junit.Assert.fail; + /** * Base class for tests for {@link ParameterTool}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index
[GitHub] zentol closed pull request #6618: [FLINK-10207][build] Bump checkstyle to 8.9
zentol closed pull request #6618: [FLINK-10207][build] Bump checkstyle to 8.9 URL: https://github.com/apache/flink/pull/6618 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/flinkDev/ide_setup.md b/docs/flinkDev/ide_setup.md index 66cd552a0b5..87e7ceb5e99 100644 --- a/docs/flinkDev/ide_setup.md +++ b/docs/flinkDev/ide_setup.md @@ -87,7 +87,7 @@ IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin. 1. Install the "Checkstyle-IDEA" plugin from the IntelliJ plugin repository. 2. Configure the plugin by going to Settings -> Other Settings -> Checkstyle. 3. Set the "Scan Scope" to "Only Java sources (including tests)". -4. Select _8.4_ in the "Checkstyle Version" dropdown and click apply. **This step is important, +4. Select _8.9_ in the "Checkstyle Version" dropdown and click apply. **This step is important, don't skip it!** 5. In the "Configuration File" pane, add a new configuration using the plus icon: 1. Set the "Description" to "Flink". diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index ec9cfc548c1..a83f29cb6b5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -60,6 +60,9 @@ import java.util.Collections; import java.util.UUID; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -67,9 +70,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; - /** * Simple and maybe stupid test to check the {@link ClusterClient} class. */ diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java index 4f77f954d3e..3d6c5837e97 100644 --- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java @@ -29,15 +29,15 @@ import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.tests.artificialstate.ComplexPayload; +import java.util.Collections; +import java.util.List; + import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; -import java.util.Collections; -import java.util.List; - /** * Test upgrade of generic stateful job for Flink's DataStream API operators and primitives. * diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java index cd371216586..784a50eeb70 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java @@ -26,12 +26,12 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.fail; - import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; +import static org.junit.Assert.fail; + /** * Base class for tests for {@link ParameterTool}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index 53145504ec8..35fa7b82a02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++
[GitHub] alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-417063770 @aljoscha, would be even better to have parameter object sort of wrapping Kafka ConsumerRecord instead of flat list of arguments (messageKey, message, topic, partition, offset, timestamp, headers,...) and pass it to deserialize? It feels more extensible approach - any futher changes in parameter object seems transparent for clients This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596715#comment-16596715 ] ASF GitHub Bot commented on FLINK-8500: --- alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-417063770 @aljoscha, would be even better to have parameter object sort of wrapping Kafka ConsumerRecord instead of flat list of arguments (messageKey, message, topic, partition, offset, timestamp, headers,...) and pass it to deserialize? It feels more extensible approach - any futher changes in parameter object seems transparent for clients This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596674#comment-16596674 ] ASF GitHub Bot commented on FLINK-7551: --- zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213781696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: I just realized that the current approach in the PR has a troublesome flaw: the default API is actually mixed iff a newer version defines a new url. (if we add `/totally/new/url` in `v2` then the default for other urls would be `v1`, but for the new one it would be `v2`). As you suggested we should restrict the default to a specific version, which would also solve the issue of multiple registrations. What we could do is add a `isDefault` flag to the `RestAPIVersion` enum, and only register unversioned urls for that version. We could enforce with a test that exactly one version is marked as the default. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213781696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: I just realized that the current approach in the PR has a troublesome flaw: the default API is actually mixed iff a newer version defines a new url. (if we add `/totally/new/url` in `v2` then the default for other urls would be `v1`, but for the new one it would be `v2`). As you suggested we should restrict the default to a specific version, which would also solve the issue of multiple registrations. What we could do is add a `isDefault` flag to the `RestAPIVersion` enum, and only register unversioned urls for that version. We could enforce with a test that exactly one version is marked as the default. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1659#comment-1659 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213778983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; + +/** + * An enum for all versions of the REST API. + * + * REST API versions are global and thus apply to every REST component. + * + * Changes that must result in an API version increment include but are not limited to: + * - modification of a handler url + * - addition of new mandatory parameters + * - removal of a handler/request + * - modifications to request/response bodies (excluding additions) + */ +public enum RestAPIVersion { + V0(0), // strictly for testing purposes Review comment: The above comment has become obsolete with https://github.com/apache/flink/pull/6602#discussion_r213742055 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596664#comment-16596664 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213778724 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Ah ok, the idea is that the unversioned URLs point to the oldest supported REST API version, right? The problem with this approach is that we would break the API (unversioned URLs) every time we drop support for a given version. Given that the versioned URLs are stable and we did not promise REST API stability yet, this should most likely be ok. The breaking change would only happen if users of the unversioned URLs don't migrate to the versioned URLs and we drop support for a version. Once we do this, we should add it to the migration guide. Could we maybe add a check which only registers handlers of the oldest version under the unversioned URLs instead of all? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213778983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; + +/** + * An enum for all versions of the REST API. + * + * REST API versions are global and thus apply to every REST component. + * + * Changes that must result in an API version increment include but are not limited to: + * - modification of a handler url + * - addition of new mandatory parameters + * - removal of a handler/request + * - modifications to request/response bodies (excluding additions) + */ +public enum RestAPIVersion { + V0(0), // strictly for testing purposes Review comment: The above comment has become obsolete with https://github.com/apache/flink/pull/6602#discussion_r213742055 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213778724 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Ah ok, the idea is that the unversioned URLs point to the oldest supported REST API version, right? The problem with this approach is that we would break the API (unversioned URLs) every time we drop support for a given version. Given that the versioned URLs are stable and we did not promise REST API stability yet, this should most likely be ok. The breaking change would only happen if users of the unversioned URLs don't migrate to the versioned URLs and we drop support for a version. Once we do this, we should add it to the migration guide. Could we maybe add a check which only registers handlers of the oldest version under the unversioned URLs instead of all? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API
[ https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596650#comment-16596650 ] ASF GitHub Bot commented on FLINK-10170: tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-417042511 @buptljy In the latest commit, the array types are supported as well, please check it out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support string representation for map types in descriptor-based Table API > - > > Key: FLINK-10170 > URL: https://issues.apache.org/jira/browse/FLINK-10170 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: flink-10170 > > > Since 1.6 the recommended way of creating source/sink table is using > connector/format/schema/ descriptors. However, when declaring map types in > the schema descriptor, the following exception would be thrown: > {quote}org.apache.flink.table.api.TableException: A string representation for > map types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API
tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-417042511 @buptljy In the latest commit, the array types are supported as well, please check it out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tragicjun removed a comment on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API
tragicjun removed a comment on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-414320931 Thanks for reminding @buptljy. Array types is a bit more complex than map types. Actually I've already implemented a version on my branch. But it introduces some implementation tricks, which I thought it would be better to open another issue to discuss (seems like FLINK-10120 is the issue to go). What is your opinion @twalthr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API
[ https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596649#comment-16596649 ] ASF GitHub Bot commented on FLINK-10170: tragicjun removed a comment on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-414320931 Thanks for reminding @buptljy. Array types is a bit more complex than map types. Actually I've already implemented a version on my branch. But it introduces some implementation tricks, which I thought it would be better to open another issue to discuss (seems like FLINK-10120 is the issue to go). What is your opinion @twalthr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support string representation for map types in descriptor-based Table API > - > > Key: FLINK-10170 > URL: https://issues.apache.org/jira/browse/FLINK-10170 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: flink-10170 > > > Since 1.6 the recommended way of creating source/sink table is using > connector/format/schema/ descriptors. However, when declaring map types in > the schema descriptor, the following exception would be thrown: > {quote}org.apache.flink.table.api.TableException: A string representation for > map types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596630#comment-16596630 ] ASF GitHub Bot commented on FLINK-7964: --- eliaslevy edited a comment on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417037795 What is the need for all these different Kafka connector versions? Newer Kafka clients are compatible with older brokers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596629#comment-16596629 ] ASF GitHub Bot commented on FLINK-7964: --- eliaslevy commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417037795 What is the need for all this different Kafka connector versions? Newer Kafka clients are compatible with older brokers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy edited a comment on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
eliaslevy edited a comment on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417037795 What is the need for all these different Kafka connector versions? Newer Kafka clients are compatible with older brokers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eliaslevy commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
eliaslevy commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417037795 What is the need for all this different Kafka connector versions? Newer Kafka clients are compatible with older brokers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596622#comment-16596622 ] ASF GitHub Bot commented on FLINK-10254: azagrebin commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#issuecomment-417035193 @zentol, I think it can be merged as soon as the CI is ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
azagrebin commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#issuecomment-417035193 @zentol, I think it can be merged as soon as the CI is ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596562#comment-16596562 ] Eron Wright commented on FLINK-9172: - [~twalthr] do you agree that we need to extend the environment file to register external catalogs? Assumedly the catalog factory would require some connection info and related parameters. Likewise we may need a set of descriptors to provide a typed API. In other words, the pattern that was established for sources could now be applied to catalogs. Maybe a design doc should be written to allow for further discussion on this. BTW I am very much looking forward to this feature, and would like to help. Here's the scenario that I have in mind: to be able to develop a library for a particular domain, that defines a catalog of tables that relate to that domain. Then, use that library in Java/Scala programs and in the SQL Client. In other words, catalogs provide a unified way to define a set of tables. WDYT? > Support external catalog factory that comes default with SQL-Client > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10255: -- Priority: Blocker (was: Critical) > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596551#comment-16596551 ] ASF GitHub Bot commented on FLINK-7551: --- zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213742055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: I'm not sure what you mean with legacy handlers. The goal here is to always keep the oldest version as the default. So, currently `v1` is the default, and if we remove that at any point then `v2` will be the new default. Basically, we have 2 options for dealing with unversioned URLS in the future: * remove unversioned URLs altogether, in which case explicit UNVERSIONED is useful for the time being * always keep a default version, in which case UNVERSIONED is not useful as we have to carry it over whenever we remove an API which is prone to errors This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213742055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: I'm not sure what you mean with legacy handlers. The goal here is to always keep the oldest version as the default. So, currently `v1` is the default, and if we remove that at any point then `v2` will be the new default. Basically, we have 2 options for dealing with unversioned URLS in the future: * remove unversioned URLs altogether, in which case explicit UNVERSIONED is useful for the time being * always keep a default version, in which case UNVERSIONED is not useful as we have to carry it over whenever we remove an API which is prone to errors This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596539#comment-16596539 ] ASF GitHub Bot commented on FLINK-7964: --- yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417007643 @aljoscha I agree with you. The current chain of dependencies does make refactoring difficult and not easy to abstract. My question is that for different versions of kafka client, if the API has hardly changed (for example, kafka 0.11 and kafka 1.0), you are not in favor of flink-connector-kafka-1.0 inheriting the implementation of flink-connector-kafka-0.11? ? Or your idea is to extract some util methods (if the other connectors are independent of each other except the base module)? But these util methods may only work for both versions and not for all. If we don't take an inherited solution, it will result in a lot of duplicate code. In addition, I have done some common implementation abstraction for all kafka connector test code, but I don't know why the 0.9 version of the connector can't pass the test. I plan to fall back to the first commit, then split multiple commits to change it, and push each commit to the PR to verify that it doesn't break the existing tests and simplify the complexity of the positioning problem.What do you think of this? cc @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence
[GitHub] yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-417007643 @aljoscha I agree with you. The current chain of dependencies does make refactoring difficult and not easy to abstract. My question is that for different versions of kafka client, if the API has hardly changed (for example, kafka 0.11 and kafka 1.0), you are not in favor of flink-connector-kafka-1.0 inheriting the implementation of flink-connector-kafka-0.11? ? Or your idea is to extract some util methods (if the other connectors are independent of each other except the base module)? But these util methods may only work for both versions and not for all. If we don't take an inherited solution, it will result in a lot of duplicate code. In addition, I have done some common implementation abstraction for all kafka connector test code, but I don't know why the 0.9 version of the connector can't pass the test. I plan to fall back to the first commit, then split multiple commits to change it, and push each commit to the PR to verify that it doesn't break the existing tests and simplify the complexity of the positioning problem.What do you think of this? cc @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10256) Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase
陈梓立 created FLINK-10256: --- Summary: Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase Key: FLINK-10256 URL: https://issues.apache.org/jira/browse/FLINK-10256 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 I am planning to rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase. That is, reorganize the legacy tests, make them neat and cover cases explicitly. The PR would follow before this weekend. While reworking, I'd like to add more jm failover test cases list below, for the further implement of jm failover with RECONCILING state. For "jm failover", I mean a real world failover(like low power or process exit), without calling Flink internal postStop logic or something like it. 1. Streaming task with jm failover. 2. Streaming task with jm failover concurrent to task fail. 3. Batch task with jm failover. 4. Batch task with jm failover concurrent to task fail. 5. Batch task with jm failover when some vertex has already been FINISHED. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596523#comment-16596523 ] ASF GitHub Bot commented on FLINK-10254: azagrebin commented on a change in pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#discussion_r213735414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ## @@ -100,9 +100,11 @@ public AbstractKeyedStateBackend( ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider) { + Preconditions.checkArgument(numberOfKeyGroups >= 1, "NumberOfKeyGroups must be a positive number"); Review comment: We can also add this: ``` Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
azagrebin commented on a change in pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#discussion_r213735414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ## @@ -100,9 +100,11 @@ public AbstractKeyedStateBackend( ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider) { + Preconditions.checkArgument(numberOfKeyGroups >= 1, "NumberOfKeyGroups must be a positive number"); Review comment: We can also add this: ``` Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596507#comment-16596507 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213732010 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Tests for {@link RestAPIVersion}. + */ +public class RestAPIVersionTest { Review comment: `extends TestLogger` missing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596506#comment-16596506 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213730666 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; + +/** + * An enum for all versions of the REST API. + * + * REST API versions are global and thus apply to every REST component. + * + * Changes that must result in an API version increment include but are not limited to: + * - modification of a handler url + * - addition of new mandatory parameters + * - removal of a handler/request + * - modifications to request/response bodies (excluding additions) + */ +public enum RestAPIVersion { + V0(0), // strictly for testing purposes Review comment: Should we maybe introduce a `Unversioned` enum value? Then we could explicitly define the handlers which are registered without a version prefix. Otherwise we register every handler there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596511#comment-16596511 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on issue #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#issuecomment-417002563 It would be good to restart the Travis build to see if all tests pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.
[ https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596505#comment-16596505 ] ASF GitHub Bot commented on FLINK-7551: --- tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213729929 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Shouldn't we explicitly register the legacy handlers? I think it might be a bit confusing if we change a handler which no longer supports the legacy format and still register it here. We could, for example introduce a legacy version `RestAPIVersion.Unversioned` which we register without the version prefix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add VERSION to the REST urls. > -- > > Key: FLINK-7551 > URL: https://issues.apache.org/jira/browse/FLINK-7551 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > This is to guarantee that we can update the REST API without breaking > existing third-party clients. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on issue #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#issuecomment-417002563 It would be good to restart the Travis build to see if all tests pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213732010 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Tests for {@link RestAPIVersion}. + */ +public class RestAPIVersionTest { Review comment: `extends TestLogger` missing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213730666 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; + +/** + * An enum for all versions of the REST API. + * + * REST API versions are global and thus apply to every REST component. + * + * Changes that must result in an API version increment include but are not limited to: + * - modification of a handler url + * - addition of new mandatory parameters + * - removal of a handler/request + * - modifications to request/response bodies (excluding additions) + */ +public enum RestAPIVersion { + V0(0), // strictly for testing purposes Review comment: Should we maybe introduce a `Unversioned` enum value? Then we could explicitly define the handlers which are registered without a version prefix. Otherwise we register every handler there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API
tillrohrmann commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API URL: https://github.com/apache/flink/pull/6602#discussion_r213729929 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -364,22 +364,37 @@ public String getRestBaseUrl() { } } - private static void registerHandler(Router router, Tuple2 specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2 specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + // setup unversioned url for convenience and backwards compatibility + // this url will always point to the oldest supported version + // this is controlled by the order in which handler are registered + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); Review comment: Shouldn't we explicitly register the legacy handlers? I think it might be a bit confusing if we change a handler which no longer supports the legacy format and still register it here. We could, for example introduce a legacy version `RestAPIVersion.Unversioned` which we register without the version prefix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596499#comment-16596499 ] ASF GitHub Bot commented on FLINK-9559: --- hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-417001158 Hi, @pnowojski I agree with you now: change all CHAR to VARCHAR in Flink. And yes, this is a serious problem since `case when`, `concat`, etc are very commonly used in SQL. Considering that changing CHAR to VARCHAR brakes backward compatibility, I think we can provide a configuration that whether to change char to varchar for users. Since this is a big change. It would be great to have your opinions here. @fhueske @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-417001158 Hi, @pnowojski I agree with you now: change all CHAR to VARCHAR in Flink. And yes, this is a serious problem since `case when`, `concat`, etc are very commonly used in SQL. Considering that changing CHAR to VARCHAR brakes backward compatibility, I think we can provide a configuration that whether to change char to varchar for users. Since this is a big change. It would be great to have your opinions here. @fhueske @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596497#comment-16596497 ] ASF GitHub Bot commented on FLINK-10254: Aitozi commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#issuecomment-417000898 cc @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Aitozi commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
Aitozi commented on issue #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632#issuecomment-417000898 cc @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596495#comment-16596495 ] ASF GitHub Bot commented on FLINK-10254: Aitozi opened a new pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632 ## What is the purpose of the change The argument checked in `AbstractKeyedStateBackend` of numOfKeyGroups is inappropriate, and it should be checked with the value bigger than 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10254: --- Labels: pull-request-available (was: ) > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Aitozi opened a new pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
Aitozi opened a new pull request #6632: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend URL: https://github.com/apache/flink/pull/6632 ## What is the purpose of the change The argument checked in `AbstractKeyedStateBackend` of numOfKeyGroups is inappropriate, and it should be checked with the value bigger than 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalHash
azagrebin commented on issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalHash URL: https://github.com/apache/flink/pull/6080#issuecomment-416999822 +1, can be merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9443) Remove unused parameter in StreamGraphHasherV2
[ https://issues.apache.org/jira/browse/FLINK-9443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596490#comment-16596490 ] ASF GitHub Bot commented on FLINK-9443: --- azagrebin commented on issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalHash URL: https://github.com/apache/flink/pull/6080#issuecomment-416999822 +1, can be merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove unused parameter in StreamGraphHasherV2 > --- > > Key: FLINK-9443 > URL: https://issues.apache.org/jira/browse/FLINK-9443 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.2.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method > generateNodeLocalHash dont use the information like (parallel, userFunction) > now, so the parameter should be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9443) Remove unused parameter in StreamGraphHasherV2
[ https://issues.apache.org/jira/browse/FLINK-9443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9443: -- Labels: pull-request-available (was: ) > Remove unused parameter in StreamGraphHasherV2 > --- > > Key: FLINK-9443 > URL: https://issues.apache.org/jira/browse/FLINK-9443 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.2.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method > generateNodeLocalHash dont use the information like (parallel, userFunction) > now, so the parameter should be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10254) Fix inappropriate checkNotNull in stateBackend
[ https://issues.apache.org/jira/browse/FLINK-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-10254: --- Summary: Fix inappropriate checkNotNull in stateBackend (was: Fix check in stateBackend) > Fix inappropriate checkNotNull in stateBackend > -- > > Key: FLINK-10254 > URL: https://issues.apache.org/jira/browse/FLINK-10254 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Fix For: 1.6.0 > > > The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , > we just have to make sure it is bigger than 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6629: [hotfix] [docs] Fix typo in batch connectors
yanghua commented on issue #6629: [hotfix] [docs] Fix typo in batch connectors URL: https://github.com/apache/flink/pull/6629#issuecomment-416996532 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596464#comment-16596464 ] ASF GitHub Bot commented on FLINK-4534: --- zentol closed pull request #4482: [FLINK-4534] Fix synchronization issue in BucketingSink URL: https://github.com/apache/flink/pull/4482 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index faf3c566803..2472387ed44 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -422,9 +422,11 @@ private void initFileSystem() throws IOException { @Override public void close() throws Exception { - if (state != null) { - for (Map.Entry> entry : state.bucketStates.entrySet()) { - closeCurrentPartFile(entry.getValue()); + synchronized (state.bucketStates) { + if (state != null) { + for (Map.Entry> entry : state.bucketStates.entrySet()) { + closeCurrentPartFile(entry.getValue()); + } } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4534. --- Resolution: Not A Problem > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10229) Support listing of views
[ https://issues.apache.org/jira/browse/FLINK-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596462#comment-16596462 ] ASF GitHub Bot commented on FLINK-10229: yanghua opened a new pull request #6631: [FLINK-10229][SQL CLIENT] Support listing of views URL: https://github.com/apache/flink/pull/6631 ## What is the purpose of the change *This pull request support listing of views* ## Brief change log - *Support sql `SHOW VIEWS`* ## Verifying this change This change added tests and can be verified as follows: - *SqlCommandParserTest#testCommands to test command parsing* - *LocalExecutorITCase#testListViews* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support listing of views > > > Key: FLINK-10229 > URL: https://issues.apache.org/jira/browse/FLINK-10229 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Reporter: Timo Walther >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > FLINK-10163 added initial support of views for the SQL Client. According to > other database vendors, views are listed in the \{{SHOW TABLES}}. However, > there should be a way of listing only the views. We can support the \{{SHOW > VIEWS}} command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #4482: [FLINK-4534] Fix synchronization issue in BucketingSink
zentol closed pull request #4482: [FLINK-4534] Fix synchronization issue in BucketingSink URL: https://github.com/apache/flink/pull/4482 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index faf3c566803..2472387ed44 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -422,9 +422,11 @@ private void initFileSystem() throws IOException { @Override public void close() throws Exception { - if (state != null) { - for (Map.Entry> entry : state.bucketStates.entrySet()) { - closeCurrentPartFile(entry.getValue()); + synchronized (state.bucketStates) { + if (state != null) { + for (Map.Entry> entry : state.bucketStates.entrySet()) { + closeCurrentPartFile(entry.getValue()); + } } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10229) Support listing of views
[ https://issues.apache.org/jira/browse/FLINK-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10229: --- Labels: pull-request-available (was: ) > Support listing of views > > > Key: FLINK-10229 > URL: https://issues.apache.org/jira/browse/FLINK-10229 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Reporter: Timo Walther >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > FLINK-10163 added initial support of views for the SQL Client. According to > other database vendors, views are listed in the \{{SHOW TABLES}}. However, > there should be a way of listing only the views. We can support the \{{SHOW > VIEWS}} command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596461#comment-16596461 ] ASF GitHub Bot commented on FLINK-4534: --- azagrebin commented on issue #4482: [FLINK-4534] Fix synchronization issue in BucketingSink URL: https://github.com/apache/flink/pull/4482#issuecomment-416993974 Taking into account the last comment from [@kl0u in Jira issue](https://issues.apache.org/jira/browse/FLINK-4534?focusedCommentId=16550597=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16550597), we can close this PR and create another if needed to remove all `synchronized (state.bucketStates)` in `BucketingSink`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6631: [FLINK-10229][SQL CLIENT] Support listing of views
yanghua opened a new pull request #6631: [FLINK-10229][SQL CLIENT] Support listing of views URL: https://github.com/apache/flink/pull/6631 ## What is the purpose of the change *This pull request support listing of views* ## Brief change log - *Support sql `SHOW VIEWS`* ## Verifying this change This change added tests and can be verified as follows: - *SqlCommandParserTest#testCommands to test command parsing* - *LocalExecutorITCase#testListViews* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-4534: -- Labels: pull-request-available (was: ) > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #4482: [FLINK-4534] Fix synchronization issue in BucketingSink
azagrebin commented on issue #4482: [FLINK-4534] Fix synchronization issue in BucketingSink URL: https://github.com/apache/flink/pull/4482#issuecomment-416993974 Taking into account the last comment from [@kl0u in Jira issue](https://issues.apache.org/jira/browse/FLINK-4534?focusedCommentId=16550597=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16550597), we can close this PR and create another if needed to remove all `synchronized (state.bucketStates)` in `BucketingSink`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596459#comment-16596459 ] Till Rohrmann commented on FLINK-10011: --- I might have misdiagnosed the underlying problem a little bit. Initially I thought that the {{JM2}} did not release the lock it created when the job was initially submitted. However, in order for {{JM1}} to become leader, {{JM2}} needs to lose its ZooKeeper session (otherwise the ephemeral leader Znode would not be deleted). In this case, also the {{JobGraph}} lock Znodes should be removed. So where do the lock nodes come from? The answer could come from FLINK-10255. In Flip-6 we always recover and in pre-Flip-6 we sometimes jobs if we are not the leader. If we are not the leader, then we won't start execution of the job though. However, recovering the job will also lock it. Therefore, my suspicion is that the old leader {{JM2}} actually recovered the job after it reconnected to ZooKeeper. If this is the case, then you should see the following log line in the log file of {{JM2}}: {{org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Recovered SubmittedJobGraph(, null)}}. Do you still have the logs to check whether this is true or not [~elevy]? An non-volatile variable could also explain why it only occurs sometimes with pre-Flip-6 and always with Flip-6. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted
[jira] [Updated] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator
[ https://issues.apache.org/jira/browse/FLINK-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10189: - Component/s: Streaming Connectors Streaming > FindBugs warnings: Inefficient use of keySet iterator instead of entrySet > iterator > -- > > Key: FLINK-10189 > URL: https://issues.apache.org/jira/browse/FLINK-10189 > Project: Flink > Issue Type: Bug > Components: Streaming, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Hiroaki Yoshida >Assignee: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two > WMI_WRONG_MAP_ITERATOR warnings on master: > {code:java} > M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes > inefficient use of keySet iterator instead of entrySet iterator At > TtlMapState.java:[line 72] > M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) > makes inefficient use of keySet iterator instead of entrySet iterator At > HBaseTableSource.java:[line 19] > {code} > The description of the bug is as follows: > {quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator > (WMI_WRONG_MAP_ITERATOR)* > This method accesses the value of a Map entry, using a key that was retrieved > from a keySet iterator. It is more efficient to use an iterator on the > entrySet of the map, to avoid the Map.get(key) lookup. > [http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator
[ https://issues.apache.org/jira/browse/FLINK-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10189. Resolution: Fixed Fix Version/s: 1.7.0 master: 8a5271e29f664dac27c95eb8e7df2ae9be76b592 > FindBugs warnings: Inefficient use of keySet iterator instead of entrySet > iterator > -- > > Key: FLINK-10189 > URL: https://issues.apache.org/jira/browse/FLINK-10189 > Project: Flink > Issue Type: Bug > Components: Streaming, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Hiroaki Yoshida >Assignee: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two > WMI_WRONG_MAP_ITERATOR warnings on master: > {code:java} > M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes > inefficient use of keySet iterator instead of entrySet iterator At > TtlMapState.java:[line 72] > M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) > makes inefficient use of keySet iterator instead of entrySet iterator At > HBaseTableSource.java:[line 19] > {code} > The description of the bug is as follows: > {quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator > (WMI_WRONG_MAP_ITERATOR)* > This method accesses the value of a Map entry, using a key that was retrieved > from a keySet iterator. It is more efficient to use an iterator on the > entrySet of the map, to avoid the Map.get(key) lookup. > [http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator
[ https://issues.apache.org/jira/browse/FLINK-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10189: - Affects Version/s: 1.7.0 > FindBugs warnings: Inefficient use of keySet iterator instead of entrySet > iterator > -- > > Key: FLINK-10189 > URL: https://issues.apache.org/jira/browse/FLINK-10189 > Project: Flink > Issue Type: Bug > Components: Streaming, Streaming Connectors >Affects Versions: 1.7.0 >Reporter: Hiroaki Yoshida >Assignee: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two > WMI_WRONG_MAP_ITERATOR warnings on master: > {code:java} > M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes > inefficient use of keySet iterator instead of entrySet iterator At > TtlMapState.java:[line 72] > M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) > makes inefficient use of keySet iterator instead of entrySet iterator At > HBaseTableSource.java:[line 19] > {code} > The description of the bug is as follows: > {quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator > (WMI_WRONG_MAP_ITERATOR)* > This method accesses the value of a Map entry, using a key that was retrieved > from a keySet iterator. It is more efficient to use an iterator on the > entrySet of the map, to avoid the Map.get(key) lookup. > [http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator
[ https://issues.apache.org/jira/browse/FLINK-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-10189: Assignee: Hiroaki Yoshida > FindBugs warnings: Inefficient use of keySet iterator instead of entrySet > iterator > -- > > Key: FLINK-10189 > URL: https://issues.apache.org/jira/browse/FLINK-10189 > Project: Flink > Issue Type: Bug >Reporter: Hiroaki Yoshida >Assignee: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two > WMI_WRONG_MAP_ITERATOR warnings on master: > {code:java} > M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes > inefficient use of keySet iterator instead of entrySet iterator At > TtlMapState.java:[line 72] > M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) > makes inefficient use of keySet iterator instead of entrySet iterator At > HBaseTableSource.java:[line 19] > {code} > The description of the bug is as follows: > {quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator > (WMI_WRONG_MAP_ITERATOR)* > This method accesses the value of a Map entry, using a key that was retrieved > from a keySet iterator. It is more efficient to use an iterator on the > entrySet of the map, to avoid the Map.get(key) lookup. > [http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator
[ https://issues.apache.org/jira/browse/FLINK-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596450#comment-16596450 ] ASF GitHub Bot commented on FLINK-10189: zentol closed pull request #6596: [FLINK-10189] Fix FindBugs warnings: Inefficient use of keySet iterat… URL: https://github.com/apache/flink/pull/6596 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 1e090d42589..27b75d49ac9 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -144,9 +144,10 @@ public HBaseTableSource projectFields(int[] fields) { for (int field : fields) { String family = famNames[field]; Map> familyInfo = hBaseSchema.getFamilyInfo(family); - for (String qualifier : familyInfo.keySet()) { + for (Map.Entry> entry : familyInfo.entrySet()) { // create the newSchema - newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass()); + String qualifier = entry.getKey(); + newTableSource.addColumn(family, qualifier, entry.getValue().getTypeClass()); } } return newTableSource; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index f6f81ffc940..f92e8e4d95c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -68,8 +68,9 @@ public void putAll(Map map) throws Exception { return; } Map> ttlMap = new HashMap<>(map.size()); - for (UK key : map.keySet()) { - ttlMap.put(key, wrapWithTs(map.get(key))); + for (Map.Entry entry : map.entrySet()) { + UK key = entry.getKey(); + ttlMap.put(key, wrapWithTs(entry.getValue())); } original.putAll(ttlMap); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FindBugs warnings: Inefficient use of keySet iterator instead of entrySet > iterator > -- > > Key: FLINK-10189 > URL: https://issues.apache.org/jira/browse/FLINK-10189 > Project: Flink > Issue Type: Bug >Reporter: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two > WMI_WRONG_MAP_ITERATOR warnings on master: > {code:java} > M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes > inefficient use of keySet iterator instead of entrySet iterator At > TtlMapState.java:[line 72] > M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) > makes inefficient use of keySet iterator instead of entrySet iterator At > HBaseTableSource.java:[line 19] > {code} > The description of the bug is as follows: > {quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator > (WMI_WRONG_MAP_ITERATOR)* > This method accesses the value of a Map entry, using a key that was retrieved > from a keySet iterator. It is more efficient to use an iterator on the > entrySet of the map, to avoid the Map.get(key) lookup. > [http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #6596: [FLINK-10189] Fix FindBugs warnings: Inefficient use of keySet iterat…
zentol closed pull request #6596: [FLINK-10189] Fix FindBugs warnings: Inefficient use of keySet iterat… URL: https://github.com/apache/flink/pull/6596 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 1e090d42589..27b75d49ac9 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -144,9 +144,10 @@ public HBaseTableSource projectFields(int[] fields) { for (int field : fields) { String family = famNames[field]; Map> familyInfo = hBaseSchema.getFamilyInfo(family); - for (String qualifier : familyInfo.keySet()) { + for (Map.Entry> entry : familyInfo.entrySet()) { // create the newSchema - newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass()); + String qualifier = entry.getKey(); + newTableSource.addColumn(family, qualifier, entry.getValue().getTypeClass()); } } return newTableSource; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index f6f81ffc940..f92e8e4d95c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -68,8 +68,9 @@ public void putAll(Map map) throws Exception { return; } Map> ttlMap = new HashMap<>(map.size()); - for (UK key : map.keySet()) { - ttlMap.put(key, wrapWithTs(map.get(key))); + for (Map.Entry entry : map.entrySet()) { + UK key = entry.getKey(); + ttlMap.put(key, wrapWithTs(entry.getValue())); } original.putAll(ttlMap); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8951) Support OVER windows PARTITION BY (rounded) timestamp
[ https://issues.apache.org/jira/browse/FLINK-8951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596447#comment-16596447 ] Sergey Tsvetkov commented on FLINK-8951: I'd like to give this one a try. > Support OVER windows PARTITION BY (rounded) timestamp > - > > Key: FLINK-8951 > URL: https://issues.apache.org/jira/browse/FLINK-8951 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Priority: Minor > > There are a few interesting use cases that can be addressed by queries that > follow the following pattern > {code:sql} > SELECT sensorId COUNT(*) OVER (PARTITION BY CEIL(rowtime TO HOUR) ORDER BY > temp ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) FROM sensors > {code} > Such queries can be used to compute rolling cascading (tumbling) windows with > aggregates that are reset in regular intervals. This can be useful for TOP-K > per minute/hour/day queries. > Right now, such {{OVER}} windows are not supported, because we require that > the {{ORDER BY}} clause is defined on a timestamp (time indicator) attribute. > In order to support this kind of queries, we would require that the > {{PARTITION BY}} clause contains a timestamp (time indicator) attribute or a > function that is defined on it and which is monotonicity preserving. Once the > optimizer identifies this case, it could translate the query into a special > time-partitioned OVER window operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Add caching layer to SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596446#comment-16596446 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys closed pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 815b25ad153..b355c38e858 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -33,6 +33,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; +import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor; import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; @@ -175,7 +176,7 @@ private boolean isFinalState(ComputationState state) { * If computations reach a stop state, the path forward is discarded and currently constructed path is returned * with the element that resulted in the stop state. * -* @param sharedBuffer the SharedBuffer object that we need to work upon while processing +* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param event The current event to be processed or null if only pruning shall be done * @param timestamp The timestamp of the current event @@ -185,11 +186,11 @@ private boolean isFinalState(ComputationState state) { * @throws Exception Thrown if the system cannot access the state. */ public Collection>> process( - final SharedBuffer sharedBuffer, + final SharedBufferAccessor sharedBufferAccessor, final NFAState nfaState, final T event, final long timestamp) throws Exception { - return process(sharedBuffer, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip()); + return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip()); } /** @@ -200,7 +201,7 @@ private boolean isFinalState(ComputationState state) { * If computations reach a stop state, the path forward is discarded and currently constructed path is returned * with the element that resulted in the stop state. * -* @param sharedBuffer the SharedBuffer object that we need to work upon while processing +* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param event The current event to be processed or null if only pruning shall be done * @param timestamp The timestamp of the current event @@ -211,13 +212,13 @@ private boolean isFinalState(ComputationState state) { * @throws Exception Thrown if the system cannot access the state. */ public Collection>> process( - final SharedBuffer sharedBuffer, + final SharedBufferAccessor sharedBufferAccessor, final NFAState nfaState, final T event, final long timestamp, final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { - try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) { - return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy); + try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) { + return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy); } } @@ -225,33 +226,32 @@ private boolean isFinalState(ComputationState state) { * Prunes states assuming there will be no events with timestamp lower than the given one. * It cleares the sharedBuffer and also emits all timed out partial matches. * -* @param
[jira] [Resolved] (FLINK-9642) Add caching layer to SharedBuffer
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-9642. - Resolution: Fixed Fix Version/s: 1.7.0 > Add caching layer to SharedBuffer > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys closed pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
dawidwys closed pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 815b25ad153..b355c38e858 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -33,6 +33,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; +import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor; import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; @@ -175,7 +176,7 @@ private boolean isFinalState(ComputationState state) { * If computations reach a stop state, the path forward is discarded and currently constructed path is returned * with the element that resulted in the stop state. * -* @param sharedBuffer the SharedBuffer object that we need to work upon while processing +* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param event The current event to be processed or null if only pruning shall be done * @param timestamp The timestamp of the current event @@ -185,11 +186,11 @@ private boolean isFinalState(ComputationState state) { * @throws Exception Thrown if the system cannot access the state. */ public Collection>> process( - final SharedBuffer sharedBuffer, + final SharedBufferAccessor sharedBufferAccessor, final NFAState nfaState, final T event, final long timestamp) throws Exception { - return process(sharedBuffer, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip()); + return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip()); } /** @@ -200,7 +201,7 @@ private boolean isFinalState(ComputationState state) { * If computations reach a stop state, the path forward is discarded and currently constructed path is returned * with the element that resulted in the stop state. * -* @param sharedBuffer the SharedBuffer object that we need to work upon while processing +* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param event The current event to be processed or null if only pruning shall be done * @param timestamp The timestamp of the current event @@ -211,13 +212,13 @@ private boolean isFinalState(ComputationState state) { * @throws Exception Thrown if the system cannot access the state. */ public Collection>> process( - final SharedBuffer sharedBuffer, + final SharedBufferAccessor sharedBufferAccessor, final NFAState nfaState, final T event, final long timestamp, final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { - try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) { - return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy); + try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) { + return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy); } } @@ -225,33 +226,32 @@ private boolean isFinalState(ComputationState state) { * Prunes states assuming there will be no events with timestamp lower than the given one. * It cleares the sharedBuffer and also emits all timed out partial matches. * -* @param sharedBuffer the SharedBuffer object that we need to work upon while processing +* @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState
[jira] [Commented] (FLINK-10239) Register eventtime timer only once in eventtrigger
[ https://issues.apache.org/jira/browse/FLINK-10239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596406#comment-16596406 ] Aljoscha Krettek commented on FLINK-10239: -- That would be good to do. However, how do you decide whether it is the first time we see an element in the window or whether we have seen the window before? > Register eventtime timer only once in eventtrigger > -- > > Key: FLINK-10239 > URL: https://issues.apache.org/jira/browse/FLINK-10239 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.3 >Reporter: buptljy >Priority: Minor > > I find that we call ctx.registerEventTimeTimer(window.maxTimestamp()) every > time when an element is received in the window. Even though it doesn't affect > the result because it uses a Set, but I think it can still be an improvement > if we call it only once. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596402#comment-16596402 ] Aljoscha Krettek commented on FLINK-10242: -- Yes, please! > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596396#comment-16596396 ] ASF GitHub Bot commented on FLINK-8354: --- aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-416972143 Hi, we have to carefully coordinate this with #6105 and and #6577. See especially my comments on #6577. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-416972143 Hi, we have to carefully coordinate this with #6105 and and #6577. See especially my comments on #6577. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596393#comment-16596393 ] ASF GitHub Bot commented on FLINK-7964: --- aljoscha commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-416971163 I think we would need an API call bridge (`KafkaConsumerBridge` and `KafkaProducerBridge`) and a factory for these (`KafkaBridgeFactory`). Those interfaces are defined in the base package, and they basically have a method for all the methods that we want to use on the consumer or producer. The source/sink (for example the Fetcher) are implemented against those interfaces. Specific version implementations then only need to provide implementations for those. Thus cleanly separating things. With this, we don't need any Kafka dependency in the base package. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
aljoscha commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-416971163 I think we would need an API call bridge (`KafkaConsumerBridge` and `KafkaProducerBridge`) and a factory for these (`KafkaBridgeFactory`). Those interfaces are defined in the base package, and they basically have a method for all the methods that we want to use on the consumer or producer. The source/sink (for example the Fetcher) are implemented against those interfaces. Specific version implementations then only need to provide implementations for those. Thus cleanly separating things. With this, we don't need any Kafka dependency in the base package. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services