[jira] [Commented] (FLINK-7307) Add proper command line parsing tool to ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126833#comment-16126833 ] ASF GitHub Bot commented on FLINK-7307: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4476 @tillrohrmann @aljoscha I have fixed [https://issues.apache.org/jira/browse/FLINK-7307](https://issues.apache.org/jira/browse/FLINK-7307) in this PR, could you please have a look when you're free, thanks > Add proper command line parsing tool to ClusterEntrypoint > - > > Key: FLINK-7307 > URL: https://issues.apache.org/jira/browse/FLINK-7307 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Fang Yong > Labels: flip-6 > > We need to add a proper command line parsing tool to the entry point of the > {{ClusterEntrypoint#parseArguments}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4476: [FLINK-7307] Add proper command line parsing tool to Clus...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4476 @tillrohrmann @aljoscha I have fixed [https://issues.apache.org/jira/browse/FLINK-7307](https://issues.apache.org/jira/browse/FLINK-7307) in this PR, could you please have a look when you're free, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r133119309 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java --- @@ -20,14 +20,24 @@ import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; /** + * @deprecated --- End diff -- @bowenli86 one thing to mention: We should always try to have good Javadoc for why something is deprecated. Sorry but I overlooked this on my reviewing. I'll address this myself this time while merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126820#comment-16126820 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r133119309 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java --- @@ -20,14 +20,24 @@ import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; /** + * @deprecated --- End diff -- @bowenli86 one thing to mention: We should always try to have good Javadoc for why something is deprecated. Sorry but I overlooked this on my reviewing. I'll address this myself this time while merging. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126812#comment-16126812 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 LGTM! I'll rebase this on #4537, and will merge as soon as Travis gives green. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 LGTM! I'll rebase this on #4537, and will merge as soon as Travis gives green. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4537 @bowenli86 @zentol Thanks a lot for the reviews! I will address your comments and merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126800#comment-16126800 ] ASF GitHub Bot commented on FLINK-7440: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4537 @bowenli86 @zentol Thanks a lot for the reviews! I will address your comments and merge this. > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0, 1.3.3 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-4500) Cassandra sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Fong reassigned FLINK-4500: --- Assignee: Michael Fong > Cassandra sink can lose messages > > > Key: FLINK-4500 > URL: https://issues.apache.org/jira/browse/FLINK-4500 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Michael Fong > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink
[ https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Fong reassigned FLINK-6805: --- Assignee: Michael Fong > Flink Cassandra connector dependency on Netty disagrees with Flink > -- > > Key: FLINK-6805 > URL: https://issues.apache.org/jira/browse/FLINK-6805 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Shannon Carey >Assignee: Michael Fong > Fix For: 1.4.0 > > > The Flink Cassandra connector has a dependency on Netty libraries (via > promotion of transitive dependencies by the Maven shade plugin) at version > 4.0.33.Final, which disagrees with the version included in Flink of > 4.0.27.Final which is included & managed by the parent POM via dependency on > netty-all. > Due to use of netty-all, the dependency management doesn't take effect on the > individual libraries such as netty-handler, netty-codec, etc. > I suggest that dependency management of Netty should be added for all Netty > libraries individually (netty-handler, etc.) so that all Flink modules use > the same version, and similarly I suggest that exclusions be added to the > quickstart example POM for the individual Netty libraries so that fat JARs > don't include conflicting versions of Netty. > It seems like this problem started when FLINK-6084 was implemented: > transitive dependencies of the flink-connector-cassandra were previously > omitted, and now that they are included we must make sure that they agree > with the Flink distribution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126768#comment-16126768 ] ASF GitHub Bot commented on FLINK-7440: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4537 Once this is checked in, we'll be more confident that https://github.com/apache/flink/pull/4473 is all good and it doesn't break anything > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0, 1.3.3 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4537 Once this is checked in, we'll be more confident that https://github.com/apache/flink/pull/4473 is all good and it doesn't break anything --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126742#comment-16126742 ] ASF GitHub Bot commented on FLINK-7245: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 @fhueske Yes, the plural is better. I should have noticed that before. This PR is updated with the new package name and an extra delay parameter added to the co-operator. > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 @fhueske Yes, the plural is better. I should have noticed that before. This PR is updated with the new package name and an extra delay parameter added to the co-operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126721#comment-16126721 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r133104586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446 > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
Jark Wu created FLINK-7446: -- Summary: Support to define an existing field as the rowtime field for TableSource Key: FLINK-7446 URL: https://issues.apache.org/jira/browse/FLINK-7446 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Jark Wu Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field for a {{TableSource}}. But it would be helpful if we can support to define an existing field as the rowtime field. Just like registering a DataStream, the rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r133104586 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126717#comment-16126717 ] ASF GitHub Bot commented on FLINK-7419: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4524 @zentol Thank you for your review, I have rename the pattern, thanks > Shade jackson dependency in flink-avro > -- > > Key: FLINK-7419 > URL: https://issues.apache.org/jira/browse/FLINK-7419 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Fang Yong > Fix For: 1.4.0 > > > Avro uses {{org.codehouse.jackson}} which also exists in multiple > incompatible versions. We should shade it to > {{org.apache.flink.shaded.avro.org.codehouse.jackson}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4524: [FLINK-7419] Shade jackson dependency in flink-avro
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4524 @zentol Thank you for your review, I have rename the pattern, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7441) Double quote string literals is not supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126712#comment-16126712 ] ASF GitHub Bot commented on FLINK-7441: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4538 Thanks for the reminding @zentol . I will pay attention to the commit message in the future. > Double quote string literals is not supported in Table API and SQL > -- > > Key: FLINK-7441 > URL: https://issues.apache.org/jira/browse/FLINK-7441 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0, 1.3.3 > > > Code generation doesn't handle double quote string literals and some control > characters which leads to compile error. > {code} > Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column > 48: Expression "hello" is not an rvalue > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4538: [FLINK-7441] [table] Double quote string literals is not ...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4538 Thanks for the reminding @zentol . I will pay attention to the commit message in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126650#comment-16126650 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133091608 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, + Collection
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133091608 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, + Collection
[jira] [Commented] (FLINK-7409) WebRuntimeMonitor blocks serving threads
[ https://issues.apache.org/jira/browse/FLINK-7409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126587#comment-16126587 ] ASF GitHub Bot commented on FLINK-7409: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4527 @tillrohrmann please rebase now that #4492 has been merged > WebRuntimeMonitor blocks serving threads > > > Key: FLINK-7409 > URL: https://issues.apache.org/jira/browse/FLINK-7409 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > The {{WebRuntimeMonitor}} contains a lot of blocking operations where it > retrieves a result from the {{JobManager}} and then waits on the future to > obtain the result. This is not a good design since we are blocking server > threads with that. Instead I propose to follow a more reactive approach where > the {{RequestHandler}} returns a {{CompletableFuture}} of {{FullHttpResonse}} > which is in the completion handler written out to the channel. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4527: [FLINK-7409] [web] Make WebRuntimeMonitor reactive
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4527 @tillrohrmann please rebase now that #4492 has been merged --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4535: Eventhubs-support read from and write to Azure eventhubs
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4535 @zhuganghuaonnet we should also mention that new connectors are being contributed through the Flink release of [Apache Bahir](http://bahir.apache.org). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7443) Store and deserializer fields in MetricFetcher should be final
[ https://issues.apache.org/jira/browse/FLINK-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126495#comment-16126495 ] ASF GitHub Bot commented on FLINK-7443: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4539 +1 but I'm only seeing an improvement and not a bug > Store and deserializer fields in MetricFetcher should be final > -- > > Key: FLINK-7443 > URL: https://issues.apache.org/jira/browse/FLINK-7443 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The {{MetricStore}} and {{MetricDumpDeserializer}} fields in the > {{MetricFetcher}} should be final, as they are not meant to be overwritten > and are even used for synchronization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4539: [FLINK-7443] [metrics] MetricFetcher store and deserializ...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4539 +1 but I'm only seeing an improvement and not a bug --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7445) Remove FLINK-1234 reference from PR template
[ https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126478#comment-16126478 ] ASF GitHub Bot commented on FLINK-7445: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4542 +1 > Remove FLINK-1234 reference from PR template > > > Key: FLINK-7445 > URL: https://issues.apache.org/jira/browse/FLINK-7445 > Project: Flink > Issue Type: Improvement > Components: GitHub >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The PR template on github contains a reference to FLINK-1234 as an example > for the PR title. > The problem is that every PR that doesn't fill out the template, or rather > does not delete that part of the template, will now be referenced in > FLINK-1234, leading to spam on the mailing list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4542: [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4542 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default
[ https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126423#comment-16126423 ] ASF GitHub Bot commented on FLINK-1234: --- Github user AjayTripathy closed the pull request at: https://github.com/apache/flink/pull/4541 > Make Hadoop2 profile default > > > Key: FLINK-1234 > URL: https://issues.apache.org/jira/browse/FLINK-1234 > Project: Flink > Issue Type: Improvement >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 0.8.0 > > > As per mailing list discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4541: Small typo fix in the scope section
Github user AjayTripathy closed the pull request at: https://github.com/apache/flink/pull/4541 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7445) Remove FLINK-1234 reference from PR template
[ https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126413#comment-16126413 ] ASF GitHub Bot commented on FLINK-7445: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4542 [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template The PR template on github contains a reference to FLINK-1234 as an example for the PR title. The problem is that every PR that doesn't fill out the template, or rather does not delete that part of the template, will now be referenced in FLINK-1234, leading to spam on the mailing list. I've replaced `1234` with ``, which should still get the idea across. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7445 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4542 commit 3449b390823467c058c1a14fd5889b87200fbdad Author: zentolDate: 2017-08-14T21:14:13Z [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template > Remove FLINK-1234 reference from PR template > > > Key: FLINK-7445 > URL: https://issues.apache.org/jira/browse/FLINK-7445 > Project: Flink > Issue Type: Improvement > Components: GitHub >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The PR template on github contains a reference to FLINK-1234 as an example > for the PR title. > The problem is that every PR that doesn't fill out the template, or rather > does not delete that part of the template, will now be referenced in > FLINK-1234, leading to spam on the mailing list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4542: [FLINK-7445] [GitHub] Remove FLINK-1234 reference ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4542 [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template The PR template on github contains a reference to FLINK-1234 as an example for the PR title. The problem is that every PR that doesn't fill out the template, or rather does not delete that part of the template, will now be referenced in FLINK-1234, leading to spam on the mailing list. I've replaced `1234` with ``, which should still get the idea across. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7445 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4542 commit 3449b390823467c058c1a14fd5889b87200fbdad Author: zentolDate: 2017-08-14T21:14:13Z [FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-7445) Remove FLINK-1234 reference from PR template
[ https://issues.apache.org/jira/browse/FLINK-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-7445: --- Assignee: Chesnay Schepler > Remove FLINK-1234 reference from PR template > > > Key: FLINK-7445 > URL: https://issues.apache.org/jira/browse/FLINK-7445 > Project: Flink > Issue Type: Improvement > Components: GitHub >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > The PR template on github contains a reference to FLINK-1234 as an example > for the PR title. > The problem is that every PR that doesn't fill out the template, or rather > does not delete that part of the template, will now be referenced in > FLINK-1234, leading to spam on the mailing list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4541: Small typo fix in the scope section
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4541 I've merged the commit in 4a4db0a7f9ed2d8f1590589be3a4b43789208e6c, could you close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4541: Small typo fix in the scope section
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4541 Thank you for fixing this, merging the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7445) Remove FLINK-1234 reference from PR template
Chesnay Schepler created FLINK-7445: --- Summary: Remove FLINK-1234 reference from PR template Key: FLINK-7445 URL: https://issues.apache.org/jira/browse/FLINK-7445 Project: Flink Issue Type: Improvement Components: GitHub Reporter: Chesnay Schepler Priority: Critical The PR template on github contains a reference to FLINK-1234 as an example for the PR title. The problem is that every PR that doesn't fill out the template, or rather does not delete that part of the template, will now be referenced in FLINK-1234, leading to spam on the mailing list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126380#comment-16126380 ] Fabian Hueske edited comment on FLINK-6233 at 8/14/17 9:00 PM: --- Hi [~xccui]: 1. Yes, if both implementations can share significant parts of their code please do so. 2. No, all clean up timers should be on processing time. This is important for consistency with the other operators. 3. The current design is optimized for the RocksDBStateBackend which returns keys in order. For RocksDB (and hence for most serious use cases) the current implementation is optimal. IMO, we should not change that. 4. We will add an allowed latency parameter later in the QueryConfig. You can prepare the operator to handle this case if it does not add substantial code complexity. I think separate timer services are an optimization that could be added later. I haven't thought in detail about this, but if I'm not mistaken separate timers would only be beneficial in certain combinations of time range join predicates and delayed streams and would not improve the performance / reduce the state size of a join in all cases. [~xccui] what are your thoughts on this? was (Author: fhueske): Hi [~xccui]: #1 Yes, if both implementations can share significant parts of their code please do so. #1 No, all clean up timers should be on processing time. This is important for consistency with the other operators. #1 The current design is optimized for the RocksDBStateBackend which returns keys in order. For RocksDB (and hence for most serious use cases) the current implementation is optimal. IMO, we should not change that. #1 We will add an allowed latency parameter later in the QueryConfig. You can prepare the operator to handle this case if it does not add substantial code complexity. I think separate timer services are an optimization that could be added later. I haven't thought in detail about this, but if I'm not mistaken separate timers would only be beneficial in certain combinations of time range join predicates and delayed streams and would not improve the performance / reduce the state size of a join in all cases. [~xccui] what are your thoughts on this? > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126380#comment-16126380 ] Fabian Hueske commented on FLINK-6233: -- Hi [~xccui]: #1 Yes, if both implementations can share significant parts of their code please do so. #1 No, all clean up timers should be on processing time. This is important for consistency with the other operators. #1 The current design is optimized for the RocksDBStateBackend which returns keys in order. For RocksDB (and hence for most serious use cases) the current implementation is optimal. IMO, we should not change that. #1 We will add an allowed latency parameter later in the QueryConfig. You can prepare the operator to handle this case if it does not add substantial code complexity. I think separate timer services are an optimization that could be added later. I haven't thought in detail about this, but if I'm not mistaken separate timers would only be beneficial in certain combinations of time range join predicates and delayed streams and would not improve the performance / reduce the state size of a join in all cases. [~xccui] what are your thoughts on this? > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7444) Make external calls non-blocking
[ https://issues.apache.org/jira/browse/FLINK-7444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126361#comment-16126361 ] Till Rohrmann commented on FLINK-7444: -- It is problematic if the error handler tries to stop the failing {{RpcEndpoint}} in a blocking fashion. Then, it is basically deadlocked because the actor thread never terminates. We have seen this problem with the {{MiniCluster}} where an {{Exception}} is thrown at shut down which blocks the actor's main thread while the {{MiniCluster}} is being shut down waiting for the {{ActorSystem}} to terminate. I think the underlying problem is that one does not know what's happening outside of the {{RpcEndpoint's}} main thread and the idea was to guard against this by making the calls asynchronous. I see the point that one would want to react fast to fatal errors and maybe the problem is that we are abusing the {{FatalErrorHandler}} also for non fatal errors (e.g. more like an uncaught exception handler). Maybe we can introduce different failure cases but then one shouldn't do any blocking operations which require the {{RpcEndpoint}} to be terminated in the fatal error case. > Make external calls non-blocking > > > Key: FLINK-7444 > URL: https://issues.apache.org/jira/browse/FLINK-7444 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > All external calls from a {{RpcEndpoint}} can be potentially blocking, e.g. > calls to the {{FatalErrorHandler}}. Therefore, I propose to make all these > calls coming from the {{RpcEndpoint's}} main thread non-blocking by running > them in an {{Executor}}. That way the main thread will never be blocked. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126307#comment-16126307 ] ASF GitHub Bot commented on FLINK-7245: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4530 Good proposal @xccui! I'd prefer the plural: `org.apache.flink.table.runtime.operators` > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4530 Good proposal @xccui! I'd prefer the plural: `org.apache.flink.table.runtime.operators` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4541: Small typo fix in the scope section
Github user AjayTripathy commented on the issue: https://github.com/apache/flink/pull/4541 There is no JIRA ticket, as I reviewed in the documentation guidelines, trivial fixes and typos require no ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default
[ https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126266#comment-16126266 ] ASF GitHub Bot commented on FLINK-1234: --- GitHub user AjayTripathy opened a pull request: https://github.com/apache/flink/pull/4541 Small typo fix in the scope section *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository
[GitHub] flink pull request #4541: Small typo fix in the scope section
GitHub user AjayTripathy opened a pull request: https://github.com/apache/flink/pull/4541 Small typo fix in the scope section *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/AjayTripathy/flink patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4541.patch To close this pull request, make a
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126263#comment-16126263 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r133037759 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- What would happen if the left most table does not have a time attribute (or if it is projected out)? I just think that the semantics of the `StreamRecord` timestamps are too important to have an implicit behavior that is hard to explain and reason about for users. IMO, an exception that asks for explicit user input is the better choice compared to a behavior that depends on non-obvious query characteristics and is hard to predict. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r133037759 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- What would happen if the left most table does not have a time attribute (or if it is projected out)? I just think that the semantics of the `StreamRecord` timestamps are too important to have an implicit behavior that is hard to explain and reason about for users. IMO, an exception that asks for explicit user input is the better choice compared to a behavior that depends on non-obvious query characteristics and is hard to predict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126188#comment-16126188 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133027325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- This class is totally intended to be immutable. So beyond what it is currently enforcing, do you suggest using immutable collections inside? > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133027325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- This class is totally intended to be immutable. So beyond what it is currently enforcing, do you suggest using immutable collections inside? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126176#comment-16126176 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133026125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- Hmm, I think if we consider default load factors and for large sizes, I would pick a min >30% hit rate linear array scan over 100% hit rate random access iteration. For all expected sizes (in cache) in this class, it should not matter. LHM also consumes a bit more memory. I would tend to keep it this way. > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133026125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- Hmm, I think if we consider default load factors and for large sizes, I would pick a min >30% hit rate linear array scan over 100% hit rate random access iteration. For all expected sizes (in cache) in this class, it should not matter. LHM also consumes a bit more memory. I would tend to keep it this way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133010887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- Would it make sense to make this immutable? It looks like this should not be modified any more after fully constructing it. This would also make it clear that methods iterating over the state, or returning sets / iterables can never fail with concurrent modifications. For example the `size` method is considered a "best effort" method for info purposes only, and should not fail with an exception (it currently could fail with a `ConcurrentModificationException`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126154#comment-16126154 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021720 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -850,18 +843,20 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId()); OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId()); - MapoperatorStates1 = pending1.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates1_1 = spy(new TaskStateSnapshot()); --- End diff -- Is spying necessary here? There seem to be no `verify()` calls on this type... > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021771 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -878,14 +873,17 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { } long checkpointId2 = pending2.getCheckpointId(); - MapoperatorStates2 = pending2.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates2_1 = spy(new TaskStateSnapshot()); --- End diff -- Same as above, spying necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126150#comment-16126150 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133010887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- Would it make sense to make this immutable? It looks like this should not be modified any more after fully constructing it. This would also make it clear that methods iterating over the state, or returning sets / iterables can never fail with concurrent modifications. For example the `size` method is considered a "best effort" method for info purposes only, and should not fail with an exception (it currently could fail with a `ConcurrentModificationException`). > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126152#comment-16126152 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133022095 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() { assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); - OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); - OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); - - MapoperatorStates = checkpoint.getOperatorStates(); - - operatorStates.put(opID1, new SpyInjectingOperatorState( - opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism())); - operatorStates.put(opID2, new SpyInjectingOperatorState( - opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism())); - // check that the vertices received the trigger checkpoint message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } + OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); + OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); + TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class); --- End diff -- Why not create a proper `TaskStateSnapshot` with one entry, rather than mocking? > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126151#comment-16126151 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133013315 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java --- @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint( throw new RuntimeException(e); } + boolean hasManagedKeyedState = false; + for (Map.Entryentry : checkpointStateHandles.getSubtaskStateMappings()) { + OperatorSubtaskState state = entry.getValue(); + if (state != null) { + hasManagedKeyedState |= state.getManagedKeyedState() != null; + } + } + // should be one k/v state --- End diff -- "should be **at least** one k/v state"? > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133013315 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java --- @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint( throw new RuntimeException(e); } + boolean hasManagedKeyedState = false; + for (Map.Entryentry : checkpointStateHandles.getSubtaskStateMappings()) { + OperatorSubtaskState state = entry.getValue(); + if (state != null) { + hasManagedKeyedState |= state.getManagedKeyedState() != null; + } + } + // should be one k/v state --- End diff -- "should be **at least** one k/v state"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021720 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -850,18 +843,20 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId()); OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId()); - MapoperatorStates1 = pending1.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates1_1 = spy(new TaskStateSnapshot()); --- End diff -- Is spying necessary here? There seem to be no `verify()` calls on this type... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126155#comment-16126155 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133018189 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex executionJobVertex, List Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133009796 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- A `LinkedHashMap` has a slightly more predictable iteration performance (list traversal) compared to a `HashMap` (search through sparse table array). There are a lot of value iterations done in this class, but we also should have pretty full hash tables (since we never delete), so not sure how much difference it makes... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133018189 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex executionJobVertex, List
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126149#comment-16126149 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133009796 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- A `LinkedHashMap` has a slightly more predictable iteration performance (list traversal) compared to a `HashMap` (search through sparse table array). There are a lot of value iterations done in this class, but we also should have pretty full hash tables (since we never delete), so not sure how much difference it makes... > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133016663 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -75,31 +103,79 @@ */ private final long stateSize; + @VisibleForTesting + public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { + + this(legacyOperatorState, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); + } + + /** +* Empty state. +*/ + public OperatorSubtaskState() { --- End diff -- Minor optimization: One could make this constructor `private` and have a field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd leave this to you whether you think it worth doing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126153#comment-16126153 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133016663 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -75,31 +103,79 @@ */ private final long stateSize; + @VisibleForTesting + public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { + + this(legacyOperatorState, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); + } + + /** +* Empty state. +*/ + public OperatorSubtaskState() { --- End diff -- Minor optimization: One could make this constructor `private` and have a field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd leave this to you whether you think it worth doing... > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126156#comment-16126156 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021771 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -878,14 +873,17 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { } long checkpointId2 = pending2.getCheckpointId(); - MapoperatorStates2 = pending2.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates2_1 = spy(new TaskStateSnapshot()); --- End diff -- Same as above, spying necessary? > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133022095 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() { assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); - OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); - OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); - - MapoperatorStates = checkpoint.getOperatorStates(); - - operatorStates.put(opID1, new SpyInjectingOperatorState( - opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism())); - operatorStates.put(opID2, new SpyInjectingOperatorState( - opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism())); - // check that the vertices received the trigger checkpoint message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } + OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); + OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); + TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class); --- End diff -- Why not create a proper `TaskStateSnapshot` with one entry, rather than mocking? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7394) Implement basic InputChannel for credit-based logic
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-7394: Description: This is a part of work for credit-based network flow control. The basic works are: * Propose the {{BufferListener}} interface for notifying buffer availability and buffer destroyed. * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage the exclusive buffers itself. * {{RemoteInputChannel}} implements {{BufferListener}} interface to be notified repeatedly . * {{RemoteInputChannel}} maintains and notifies of unannounced credit. * {{RemoteInputChannel}} maintains current sender backlog to trigger requests of floating buffers. was: This is a part of work for credit-based network flow control. The basic works are: * The exclusive buffers per channel are assigned to {{RemoteInputChannel}} directly during registering task. * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage the exclusive buffers itself. * {{RemoteInputChannel}} implements {{BufferPoolListener}} interface to be notified available floating buffers from buffer pool. * {{RemoteInputChannel}} maintains unannounced credit and current sender backlog. > Implement basic InputChannel for credit-based logic > --- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Propose the {{BufferListener}} interface for notifying buffer availability > and buffer destroyed. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. > * {{RemoteInputChannel}} implements {{BufferListener}} interface to be > notified repeatedly . > * {{RemoteInputChannel}} maintains and notifies of unannounced credit. > * {{RemoteInputChannel}} maintains current sender backlog to trigger requests > of floating buffers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125979#comment-16125979 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4353 Concerning the suggestion about the `MultiStreamStateHandle` - I am not sure that this can always work. Different physical files may have headers, so it may be important to recognize them as different chunks of state in the general case. > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4353 Concerning the suggestion about the `MultiStreamStateHandle` - I am not sure that this can always work. Different physical files may have headers, so it may be important to recognize them as different chunks of state in the general case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125951#comment-16125951 ] ASF GitHub Bot commented on FLINK-7440: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4537#discussion_r13250 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -1062,4 +1104,28 @@ public boolean isClearCalled() { return fakeRestoredState; } + + private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema { --- End diff -- Can you please add a comment explaining why this is not serializable and why the following one is serializable? Basically pointing out this class is not static. > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0, 1.3.3 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4537: [FLINK-7440] [kinesis] Add various eager serializability ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4537 @tzulitai LGTM! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4537: [FLINK-7440] [kinesis] Add various eager serializa...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4537#discussion_r13250 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -1062,4 +1104,28 @@ public boolean isClearCalled() { return fakeRestoredState; } + + private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema { --- End diff -- Can you please add a comment explaining why this is not serializable and why the following one is serializable? Basically pointing out this class is not static. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7428) avoid one additional buffer copy when receiving messages
[ https://issues.apache.org/jira/browse/FLINK-7428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125913#comment-16125913 ] ASF GitHub Bot commented on FLINK-7428: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4529 we also need to double-check this against the previous issue of d92e422ec7089376583a8f57043274d236c340a4 which may be solved by the way I am using the `LengthFieldBasedFrameDecoder` now compared to back then, or the changes that happened since then > avoid one additional buffer copy when receiving messages > > > Key: FLINK-7428 > URL: https://issues.apache.org/jira/browse/FLINK-7428 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > By using {{LengthFieldBasedFrameDecoder}}, we create one unnecessary (netty) > buffer copy in this class which could be easily avoided since we can ensure > that the buffer is free to be released after decoding it in the > {{NettyMessageDecoder}} and into our own buffer and/or events. > The solution would be to make {{NettyMessageDecoder}} extend from > {{LengthFieldBasedFrameDecoder}} and handle the decoding of the frames and > the objects in there. In the frame creation otherwise done by > {{LengthFieldBasedFrameDecoder}}, we could use a sliced buffer instead. This > solution also makes the channel pipelines a bit simpler. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4529 we also need to double-check this against the previous issue of d92e422ec7089376583a8f57043274d236c340a4 which may be solved by the way I am using the `LengthFieldBasedFrameDecoder` now compared to back then, or the changes that happened since then --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-6692) The flink-dist jar contains unshaded netty jar
[ https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125770#comment-16125770 ] Stephan Ewen edited comment on FLINK-6692 at 8/14/17 4:17 PM: -- I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use Akka underneath the hood. I think we can close this, now that we have all {{io.netty.xxx}} out of the way. [~wheat9] what do you think? was (Author: stephanewen): I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use Akka underneath the hood. I think we can close this, now that we have all {{io.netty.xxx} out of the way. [~wheat9] what do you think? > The flink-dist jar contains unshaded netty jar > -- > > Key: FLINK-6692 > URL: https://issues.apache.org/jira/browse/FLINK-6692 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.4.0 > > > The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes: > {noformat} > io/netty/handler/codec/http/router/ > io/netty/handler/codec/http/router/BadClientSilencer.class > io/netty/handler/codec/http/router/MethodRouted.class > io/netty/handler/codec/http/router/Handler.class > io/netty/handler/codec/http/router/Router.class > io/netty/handler/codec/http/router/DualMethodRouter.class > io/netty/handler/codec/http/router/Routed.class > io/netty/handler/codec/http/router/AbstractHandler.class > io/netty/handler/codec/http/router/KeepAliveWrite.class > io/netty/handler/codec/http/router/DualAbstractHandler.class > io/netty/handler/codec/http/router/MethodRouter.class > {noformat} > {noformat} > org/jboss/netty/util/internal/jzlib/InfBlocks.class > org/jboss/netty/util/internal/jzlib/InfCodes.class > org/jboss/netty/util/internal/jzlib/InfTree.class > org/jboss/netty/util/internal/jzlib/Inflate$1.class > org/jboss/netty/util/internal/jzlib/Inflate.class > org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class > org/jboss/netty/util/internal/jzlib/JZlib.class > org/jboss/netty/util/internal/jzlib/StaticTree.class > org/jboss/netty/util/internal/jzlib/Tree.class > org/jboss/netty/util/internal/jzlib/ZStream$1.class > org/jboss/netty/util/internal/jzlib/ZStream.class > {noformat} > Is it an expected behavior? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7417) Create flink-shaded-jackson
[ https://issues.apache.org/jira/browse/FLINK-7417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-7417: --- Assignee: Chesnay Schepler > Create flink-shaded-jackson > --- > > Key: FLINK-7417 > URL: https://issues.apache.org/jira/browse/FLINK-7417 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The {{com.fasterml:jackson}} library is another culprit of frequent conflicts > that we need to shade away. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6982) Replace guava dependencies
[ https://issues.apache.org/jira/browse/FLINK-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6982. --- Resolution: Fixed 1.4: 8dfb9d00653271ea4adbeb752da8f62d7647b6d8 > Replace guava dependencies > -- > > Key: FLINK-6982 > URL: https://issues.apache.org/jira/browse/FLINK-6982 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7399) Add checkstyle rule to forbid codehaus jackson imports
[ https://issues.apache.org/jira/browse/FLINK-7399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7399. --- Resolution: Fixed 1.4: e30eb13c4a84517ffa502b2c9c7da4b5b8541c9c > Add checkstyle rule to forbid codehaus jackson imports > -- > > Key: FLINK-7399 > URL: https://issues.apache.org/jira/browse/FLINK-7399 > Project: Flink > Issue Type: Improvement > Components: Checkstyle >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7441) Double quote string literals is not supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125900#comment-16125900 ] ASF GitHub Bot commented on FLINK-7441: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4538 @twalthr @wuchong Please make sure that the commit messages describes what was changed, and not the state before the change. We now have a commit that adds support for quoted strings, with the commit message saying that this isn't supported. > Double quote string literals is not supported in Table API and SQL > -- > > Key: FLINK-7441 > URL: https://issues.apache.org/jira/browse/FLINK-7441 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0, 1.3.3 > > > Code generation doesn't handle double quote string literals and some control > characters which leads to compile error. > {code} > Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column > 48: Expression "hello" is not an rvalue > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4538: [FLINK-7441] [table] Double quote string literals is not ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4538 @twalthr @wuchong Please make sure that the commit messages describes what was changed, and not the state before the change. We now have a commit that adds support for quoted strings, with the commit message saying that this isn't supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7399) Add checkstyle rule to forbid codehaus jackson imports
[ https://issues.apache.org/jira/browse/FLINK-7399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125894#comment-16125894 ] ASF GitHub Bot commented on FLINK-7399: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4505 > Add checkstyle rule to forbid codehaus jackson imports > -- > > Key: FLINK-7399 > URL: https://issues.apache.org/jira/browse/FLINK-7399 > Project: Flink > Issue Type: Improvement > Components: Checkstyle >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6982) Replace guava dependencies
[ https://issues.apache.org/jira/browse/FLINK-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125895#comment-16125895 ] ASF GitHub Bot commented on FLINK-6982: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4503 > Replace guava dependencies > -- > > Key: FLINK-6982 > URL: https://issues.apache.org/jira/browse/FLINK-6982 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4505: [FLINK-7399] [checkstyle] Forbid imports from org....
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4505 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4503: [FLINK-6982] [guava] Integrate flink-shaded-guava-...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4503 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6692) The flink-dist jar contains unshaded netty jar
[ https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125856#comment-16125856 ] Haohui Mai commented on FLINK-6692: --- Sounds good. Thanks for the effort! > The flink-dist jar contains unshaded netty jar > -- > > Key: FLINK-6692 > URL: https://issues.apache.org/jira/browse/FLINK-6692 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.4.0 > > > The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes: > {noformat} > io/netty/handler/codec/http/router/ > io/netty/handler/codec/http/router/BadClientSilencer.class > io/netty/handler/codec/http/router/MethodRouted.class > io/netty/handler/codec/http/router/Handler.class > io/netty/handler/codec/http/router/Router.class > io/netty/handler/codec/http/router/DualMethodRouter.class > io/netty/handler/codec/http/router/Routed.class > io/netty/handler/codec/http/router/AbstractHandler.class > io/netty/handler/codec/http/router/KeepAliveWrite.class > io/netty/handler/codec/http/router/DualAbstractHandler.class > io/netty/handler/codec/http/router/MethodRouter.class > {noformat} > {noformat} > org/jboss/netty/util/internal/jzlib/InfBlocks.class > org/jboss/netty/util/internal/jzlib/InfCodes.class > org/jboss/netty/util/internal/jzlib/InfTree.class > org/jboss/netty/util/internal/jzlib/Inflate$1.class > org/jboss/netty/util/internal/jzlib/Inflate.class > org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class > org/jboss/netty/util/internal/jzlib/JZlib.class > org/jboss/netty/util/internal/jzlib/StaticTree.class > org/jboss/netty/util/internal/jzlib/Tree.class > org/jboss/netty/util/internal/jzlib/ZStream$1.class > org/jboss/netty/util/internal/jzlib/ZStream.class > {noformat} > Is it an expected behavior? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6692) The flink-dist jar contains unshaded netty jar
[ https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haohui Mai resolved FLINK-6692. --- Resolution: Fixed > The flink-dist jar contains unshaded netty jar > -- > > Key: FLINK-6692 > URL: https://issues.apache.org/jira/browse/FLINK-6692 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.4.0 > > > The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes: > {noformat} > io/netty/handler/codec/http/router/ > io/netty/handler/codec/http/router/BadClientSilencer.class > io/netty/handler/codec/http/router/MethodRouted.class > io/netty/handler/codec/http/router/Handler.class > io/netty/handler/codec/http/router/Router.class > io/netty/handler/codec/http/router/DualMethodRouter.class > io/netty/handler/codec/http/router/Routed.class > io/netty/handler/codec/http/router/AbstractHandler.class > io/netty/handler/codec/http/router/KeepAliveWrite.class > io/netty/handler/codec/http/router/DualAbstractHandler.class > io/netty/handler/codec/http/router/MethodRouter.class > {noformat} > {noformat} > org/jboss/netty/util/internal/jzlib/InfBlocks.class > org/jboss/netty/util/internal/jzlib/InfCodes.class > org/jboss/netty/util/internal/jzlib/InfTree.class > org/jboss/netty/util/internal/jzlib/Inflate$1.class > org/jboss/netty/util/internal/jzlib/Inflate.class > org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class > org/jboss/netty/util/internal/jzlib/JZlib.class > org/jboss/netty/util/internal/jzlib/StaticTree.class > org/jboss/netty/util/internal/jzlib/Tree.class > org/jboss/netty/util/internal/jzlib/ZStream$1.class > org/jboss/netty/util/internal/jzlib/ZStream.class > {noformat} > Is it an expected behavior? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7223) Increase DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS for Flink-kinesis-connector
[ https://issues.apache.org/jira/browse/FLINK-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125845#comment-16125845 ] Stephan Ewen commented on FLINK-7223: - [~phoenixjiangnan] Do you think it is worthwhile filing an issue at Amazon for that? It seems crazy that the number of describe requests is so low and across all apps of one account. > Increase DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS for Flink-kinesis-connector > > > Key: FLINK-7223 > URL: https://issues.apache.org/jira/browse/FLINK-7223 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Background: {{DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS}} in > {{org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants}} > is the default value for Flink to call Kinesis's {{describeStream()}} API. > Problem: Right now, its value is 10,000millis (10sec), which is too short. We > ran into problems that Flink-kinesis-connector's call of {{describeStream()}} > exceeds Kinesis rate limit, and broken Flink taskmanager. > According to > http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html, > > "This operation has a limit of 10 transactions per second per account.". What > it means is that the 10transaction/account is a limit on a single > organization's AWS account..:( We contacted AWS Support, and confirmed > this. If you have more applications (either other Flink apps or non-Flink > apps) competing aggressively with your Flink app on this API, your Flink app > breaks. > I propose increasing the value DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS from > 10,000millis(10sec) to preferably 300,000 (5min). Or at least 60,000 (1min) > if anyone has a solid reason arguing that 5min is too long, > This is also related to https://issues.apache.org/jira/browse/FLINK-6365 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125817#comment-16125817 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4532 @wuchong @fhueske I hope I addressed all code related issues. Is it ok to merge this for now? I will create a follow up issue for the Table to DataStream/TableSink conversion case. > whether we should change the rowtime type when it is an existing field I think this is a very special case. But it is just a nice addition to make the user's life easier. We could also remove the replacing feature as a whole to avoid confusion due to the data type conversion. In general, we should get rid of `TIMESTAMP` and work on longs as much as possible. In the near future, we might also extend the API to use Java 8 `java.time.` equivalents. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4532 @wuchong @fhueske I hope I addressed all code related issues. Is it ok to merge this for now? I will create a follow up issue for the Table to DataStream/TableSink conversion case. > whether we should change the rowtime type when it is an existing field I think this is a very special case. But it is just a nice addition to make the user's life easier. We could also remove the replacing feature as a whole to avoid confusion due to the data type conversion. In general, we should get rid of `TIMESTAMP` and work on longs as much as possible. In the near future, we might also extend the API to use Java 8 `java.time.` equivalents. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125801#comment-16125801 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132974743 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Can you open an issue for this? We can discuss this after merging this PR. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4488#discussion_r132974743 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fieldCnt = fieldNames.length +val fields = fieldNames.zip(fieldTypes) -val rowtime = tableSource match { +val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => --- End diff -- Can you open an issue for this? We can discuss this after merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7444) Make external calls non-blocking
[ https://issues.apache.org/jira/browse/FLINK-7444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125791#comment-16125791 ] Stephan Ewen commented on FLINK-7444: - Is that possibly pre-mature optimization? A fatal error handler blocking the main thread sounds permissible, because the process will anyways not continue normally after that. Also, direct calls have the advantage of succeeding better in fatal cases. An OOM that cannot spawn a thread will not be reported if passed through an executor. Especially for the fatal error handler, I would actually suggest to explicitly NOT send it through an executor. > Make external calls non-blocking > > > Key: FLINK-7444 > URL: https://issues.apache.org/jira/browse/FLINK-7444 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > All external calls from a {{RpcEndpoint}} can be potentially blocking, e.g. > calls to the {{FatalErrorHandler}}. Therefore, I propose to make all these > calls coming from the {{RpcEndpoint's}} main thread non-blocking by running > them in an {{Executor}}. That way the main thread will never be blocked. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125788#comment-16125788 ] ASF GitHub Bot commented on FLINK-7337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132971979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- @fhueske With left most table I mean the first time indicator in the select statement (from left). I think even join reordering does not change the column ordering. I agree that at least `TableSink`s should do deal with it implicitly. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132971979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- @fhueske With left most table I mean the first time indicator in the select statement (from left). I think even join reordering does not change the column ordering. I agree that at least `TableSink`s should do deal with it implicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7365) excessive warning logs of attempt to override final parameter: fs.s3.buffer.dir
[ https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125774#comment-16125774 ] Stephan Ewen commented on FLINK-7365: - One issue is that Flink instantiated Hadoop's file systems a bit too naively. That results in repeated config loading / parsing. In the course of improving Flink's handling of File Systems and handling/loading of Hadoop's file systems, we should make sure that the Hadoop File Systems are instantiated only once, which ensures that the conflig is parsed once and not per checkpoint (that seems actually a bit too wasteful anyways). > excessive warning logs of attempt to override final parameter: > fs.s3.buffer.dir > --- > > Key: FLINK-7365 > URL: https://issues.apache.org/jira/browse/FLINK-7365 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.3.0 >Reporter: Bowen Li > > I'm seeing hundreds of line of the following log in my JobManager log file: > {code:java} > 2017-08-03 19:48:45,330 WARN org.apache.hadoop.conf.Configuration > - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to > override final parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,485 WARN org.apache.hadoop.conf.Configuration > - /etc/hadoop/conf/core-site.xml:an attempt to override final > parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,486 WARN org.apache.hadoop.conf.Configuration > - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to > override final parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,626 WARN org.apache.hadoop.conf.Configuration > - /etc/hadoop/conf/core-site.xml:an attempt to override final > parameter: fs.s3.buffer.dir; Ignoring > .. > {code} > Info of my Flink cluster: > - Running on EMR with emr-5.6.0 > - Using FSStateBackend, writing checkpointing data files to s3 > - Configured s3 with S3AFileSystem according to > https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem > - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a tag on > this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp' > Here's my core-site.xml file: > {code:java} > > > > > > > > > > > > > > > > > > > fs.s3.buffer.dir > /mnt/s3,/mnt1/s3 > true > > > fs.s3.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > > > fs.s3n.impl > com.amazon.ws.emr.hadoop.fs.EmrFileSystem > > > ipc.client.connect.max.retries.on.timeouts > 5 > > > hadoop.security.key.default.bitlength > 256 > > > hadoop.proxyuser.hadoop.groups > * > > > hadoop.tmp.dir > /mnt/var/lib/hadoop/tmp > > > hadoop.proxyuser.hadoop.hosts > * > > > io.file.buffer.size > 65536 > > > fs.AbstractFileSystem.s3.impl > org.apache.hadoop.fs.s3.EMRFSDelegate > > > fs.s3a.buffer.dir > /tmp > > > fs.s3bfs.impl > org.apache.hadoop.fs.s3.S3FileSystem > > > {code} > This bug is about excessive logging. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6692) The flink-dist jar contains unshaded netty jar
[ https://issues.apache.org/jira/browse/FLINK-6692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125770#comment-16125770 ] Stephan Ewen commented on FLINK-6692: - I think we cannot avoid the {{org.jboss.netty}} dependency as long as we use Akka underneath the hood. I think we can close this, now that we have all {{io.netty.xxx} out of the way. [~wheat9] what do you think? > The flink-dist jar contains unshaded netty jar > -- > > Key: FLINK-6692 > URL: https://issues.apache.org/jira/browse/FLINK-6692 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.4.0 > > > The {{flink-dist}} jar contains unshaded netty 3 and netty 4 classes: > {noformat} > io/netty/handler/codec/http/router/ > io/netty/handler/codec/http/router/BadClientSilencer.class > io/netty/handler/codec/http/router/MethodRouted.class > io/netty/handler/codec/http/router/Handler.class > io/netty/handler/codec/http/router/Router.class > io/netty/handler/codec/http/router/DualMethodRouter.class > io/netty/handler/codec/http/router/Routed.class > io/netty/handler/codec/http/router/AbstractHandler.class > io/netty/handler/codec/http/router/KeepAliveWrite.class > io/netty/handler/codec/http/router/DualAbstractHandler.class > io/netty/handler/codec/http/router/MethodRouter.class > {noformat} > {noformat} > org/jboss/netty/util/internal/jzlib/InfBlocks.class > org/jboss/netty/util/internal/jzlib/InfCodes.class > org/jboss/netty/util/internal/jzlib/InfTree.class > org/jboss/netty/util/internal/jzlib/Inflate$1.class > org/jboss/netty/util/internal/jzlib/Inflate.class > org/jboss/netty/util/internal/jzlib/JZlib$WrapperType.class > org/jboss/netty/util/internal/jzlib/JZlib.class > org/jboss/netty/util/internal/jzlib/StaticTree.class > org/jboss/netty/util/internal/jzlib/Tree.class > org/jboss/netty/util/internal/jzlib/ZStream$1.class > org/jboss/netty/util/internal/jzlib/ZStream.class > {noformat} > Is it an expected behavior? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7441) Double quote string literals is not supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-7441: Affects Version/s: 1.3.2 > Double quote string literals is not supported in Table API and SQL > -- > > Key: FLINK-7441 > URL: https://issues.apache.org/jira/browse/FLINK-7441 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0, 1.3.3 > > > Code generation doesn't handle double quote string literals and some control > characters which leads to compile error. > {code} > Caused by: org.codehaus.commons.compiler.CompileException: Line 50, Column > 48: Expression "hello" is not an rvalue > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7413) Release Hadoop 2.8.x convenience binaries for 1.3.x
[ https://issues.apache.org/jira/browse/FLINK-7413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125766#comment-16125766 ] Stephan Ewen commented on FLINK-7413: - I think we should add these for the 1.4 release. The concern was only to not introduce this directly before the 1.3 release, because new Hadoop versions inevitably introduce new dependency conflicts. > Release Hadoop 2.8.x convenience binaries for 1.3.x > > > Key: FLINK-7413 > URL: https://issues.apache.org/jira/browse/FLINK-7413 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.3 > > > At least one user on the mailing lists had an issue because Hadoop 2.8.x > binaries are not available: > https://lists.apache.org/thread.html/c8badc66778144d9d6c3ee5cb23dd732a66cb6690c6867f47f4bd456@%3Cuser.flink.apache.org%3E > It should be as easy as adding Hadoop 2.8.x to the list of created binaries > in the release files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)