[GitHub] flink pull request #5280: Fix typo in AbstractMetricGroup.java
GitHub user maqingxiang opened a pull request: https://github.com/apache/flink/pull/5280 Fix typo in AbstractMetricGroup.java You can merge this pull request into a Git repository by running: $ git pull https://github.com/maqingxiang/flink fix-typo-in-AbstractMetricGroup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5280 commit 3c568e20bba53bf7ff675d230a3076e495d70639 Author: maqingxiang-it Date: 2018-01-11T05:44:04Z Fix typo in AbstractMetricGroup.java ---
[jira] [Comment Edited] (FLINK-8327) ClassLoader resolution of child-first does not appear to work
[ https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321642#comment-16321642 ] Raymond Tay edited comment on FLINK-8327 at 1/11/18 3:34 AM: - Closing this as i have to dynamic classloading with {{http4s}} client for Scala and i think its going to be a problem for implementers of _rich_ sources, mappers and sinks which is likely to leverage 3rd party libraries if the developer has to manage the dependencies s.t. its always found in the {{/lib}} directory was (Author: raymondtay): Closing this as i have to dynamic classloading with {{http4s}} client for Scala and i think its going to be a problem for implementers of _rich_ sources, mappers and sinks if the developer has to manage the dependencies s.t. its always found in the {{/lib}} directory > ClassLoader resolution of child-first does not appear to work > - > > Key: FLINK-8327 > URL: https://issues.apache.org/jira/browse/FLINK-8327 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: h3. Environment > * Local flink cluster version 1.4.0 > * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. > * scala 2.11.11 > * oracle jdk 1.8.0 > h3. Library version > * akka actors 2.4.20 > * akka http 10.0.10 >Reporter: Raymond Tay > > h2. Description > Was trying out the {{Queryable State}} and ran into a problem where the > submitted job starts regular Akka actors and making external HTTP calls via > {{akka-http}} libraries and the flink runtime was complaining that it was not > able to read the key {{akka.http}} (this key is held in the configuration > file for {{akka-http}}). > When i ran our app on the {{sbt}} shell locally, it was able to see the key > {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to > flink, it was throwing the error message (see below). Looks like a class > loader issue but i'm not sure. > Any help is much appreciated ! > h2. Error message > {noformat} > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.http' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) > at > com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258) > at > com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264) > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37) > at akka.http.scaladsl.Http$.createExtension(Http.scala:843) > at akka.http.scaladsl.Http$.createExtension(Http.scala:719) > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917) > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > at akka.http.scaladsl.Http$.apply(Http.scala:838) > at akka.http.scaladsl.Http$.apply(Http.scala:719) > at org.example.state.A.(Simple.scala:158) > ... 18 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8327) ClassLoader resolution of child-first does not appear to work
[ https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raymond Tay resolved FLINK-8327. Resolution: Information Provided > ClassLoader resolution of child-first does not appear to work > - > > Key: FLINK-8327 > URL: https://issues.apache.org/jira/browse/FLINK-8327 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: h3. Environment > * Local flink cluster version 1.4.0 > * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. > * scala 2.11.11 > * oracle jdk 1.8.0 > h3. Library version > * akka actors 2.4.20 > * akka http 10.0.10 >Reporter: Raymond Tay > > h2. Description > Was trying out the {{Queryable State}} and ran into a problem where the > submitted job starts regular Akka actors and making external HTTP calls via > {{akka-http}} libraries and the flink runtime was complaining that it was not > able to read the key {{akka.http}} (this key is held in the configuration > file for {{akka-http}}). > When i ran our app on the {{sbt}} shell locally, it was able to see the key > {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to > flink, it was throwing the error message (see below). Looks like a class > loader issue but i'm not sure. > Any help is much appreciated ! > h2. Error message > {noformat} > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.http' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) > at > com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258) > at > com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264) > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37) > at akka.http.scaladsl.Http$.createExtension(Http.scala:843) > at akka.http.scaladsl.Http$.createExtension(Http.scala:719) > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917) > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > at akka.http.scaladsl.Http$.apply(Http.scala:838) > at akka.http.scaladsl.Http$.apply(Http.scala:719) > at org.example.state.A.(Simple.scala:158) > ... 18 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work
[ https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321642#comment-16321642 ] Raymond Tay commented on FLINK-8327: Closing this as i have to dynamic classloading with {{http4s}} client for Scala and i think its going to be a problem for implementers of _rich_ sources, mappers and sinks if the developer has to manage the dependencies s.t. its always found in the {{/lib}} directory > ClassLoader resolution of child-first does not appear to work > - > > Key: FLINK-8327 > URL: https://issues.apache.org/jira/browse/FLINK-8327 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: h3. Environment > * Local flink cluster version 1.4.0 > * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. > * scala 2.11.11 > * oracle jdk 1.8.0 > h3. Library version > * akka actors 2.4.20 > * akka http 10.0.10 >Reporter: Raymond Tay > > h2. Description > Was trying out the {{Queryable State}} and ran into a problem where the > submitted job starts regular Akka actors and making external HTTP calls via > {{akka-http}} libraries and the flink runtime was complaining that it was not > able to read the key {{akka.http}} (this key is held in the configuration > file for {{akka-http}}). > When i ran our app on the {{sbt}} shell locally, it was able to see the key > {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to > flink, it was throwing the error message (see below). Looks like a class > loader issue but i'm not sure. > Any help is much appreciated ! > h2. Error message > {noformat} > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.http' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) > at > com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258) > at > com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264) > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37) > at akka.http.scaladsl.Http$.createExtension(Http.scala:843) > at akka.http.scaladsl.Http$.createExtension(Http.scala:719) > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917) > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > at akka.http.scaladsl.Http$.apply(Http.scala:838) > at akka.http.scaladsl.Http$.apply(Http.scala:719) > at org.example.state.A.(Simple.scala:158) > ... 18 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo @tzulitai sorry, I had a long day yesterday and misunderstood what you mean. And thank you @casidiablo @xiatao123 both for confirming it works. ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321616#comment-16321616 ] ASF GitHub Bot commented on FLINK-6951: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo @tzulitai sorry, I had a long day yesterday and misunderstood what you mean. And thank you @casidiablo @xiatao123 both for confirming it works. > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321558#comment-16321558 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849525 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- yes, it should release all readers here. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849525 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- yes, it should release all readers here. ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321557#comment-16321557 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- agree > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- agree ---
[jira] [Commented] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables
[ https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321536#comment-16321536 ] ASF GitHub Bot commented on FLINK-8324: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5214 Thanks for the PR @tony810430. The changes LGTM, will merge this a bit later. > Expose another offsets metrics by using new metric API to specify user > defined variables > > > Key: FLINK-8324 > URL: https://issues.apache.org/jira/browse/FLINK-8324 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Trivial > Fix For: 1.5.0 > > > The {{current-offsets}} and {{committed-offsets}} metrics are now attached > with topic name and partition id in the metric identity. > It is not convenient to use these metrics in Prometheus, because user usually > uses the same metric group name to group by those metrics which have the same > meaning and uses tags to get the individual metric. > For example, I will prefer to access {{current-offsets}} metric group and use > {{partition-x}} tag to get the offset of partition x, instead of getting > metric directly from {{current-offsets-partition-x}} metric. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5214: [FLINK-8324] [kafka] Expose another offsets metrics by us...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5214 Thanks for the PR @tony810430. The changes LGTM, will merge this a bit later. ---
[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321533#comment-16321533 ] Tzu-Li (Gordon) Tai commented on FLINK-6109: Thanks for picking the issue up [~aitozi]! Of course, contributions are always welcomed. I have a few points I would like to discuss: 1. I've thought a bit about exposing "lag" as a metric directly in the Kafka consumer, but it seems like it could be a bit out-of-scope for the consumer itself. For the consumer to calculate this lag, it would essentially need to query Kafka the current head offset, which can be very overwhelming. 2. Because of that, I suggest that we only add a `checkpointedOffset` metric, scoped by topic and partition id. We already have a `currentOffsets` and `committedOffsets` metric, so adding that would be a good addition overall, IMO. The "lag" should be determined by the user in conjuction with other Kafka tools. 3. You can perhaps build your work on top of https://github.com/apache/flink/pull/5214, which I'll be merging soon, to avoid any conflicts. I would like this new metric to be added for the 1.5.0 release, if possible. How are you doing currently with the feature? > Add "consumer lag" report metric to FlinkKafkaConsumer > -- > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: aitozi > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8410) Kafka consumer's commitedOffsets gauge metric is prematurely set
Tzu-Li (Gordon) Tai created FLINK-8410: -- Summary: Kafka consumer's commitedOffsets gauge metric is prematurely set Key: FLINK-8410 URL: https://issues.apache.org/jira/browse/FLINK-8410 Project: Flink Issue Type: Bug Components: Kafka Connector, Metrics Affects Versions: 1.3.2, 1.4.0, 1.5.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.3.3, 1.5.0, 1.4.1 The {{committedOffset}} metric gauge value is set too early. It is set here: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L236 This sets the committed offset before the actual commit happens, which varies depending on whether the commit mode is auto periodically, or committed on checkpoints. Moreover, in the 0.9+ consumers, the {{KafkaConsumerThread}} may choose to supersede some commit attempts if the commit takes longer than the commit interval. While the committed offset back to Kafka is not a critical value used by the consumer, it will be best to have more accurate values as a Flink-shipped metric. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841610 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- Great idea. ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321497#comment-16321497 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841610 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- Great idea. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321495#comment-16321495 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841457 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Ah I see, that makes sense then. Could you then name it as `KinesisConsumer`, to be more consistent? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321494#comment-16321494 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841254 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- We probably should also add a `stream` group before `shardId`, since the Kinesis consumer allows subscribing to multiple streams. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841457 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Ah I see, that makes sense then. Could you then name it as `KinesisConsumer`, to be more consistent? ---
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841254 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- We probably should also add a `stream` group before `shardId`, since the Kinesis consumer allows subscribing to multiple streams. ---
[jira] [Comment Edited] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321463#comment-16321463 ] Eron Wright edited comment on FLINK-7883 at 1/11/18 12:45 AM: --- Can we restate the problem that we're trying to solve? To my understanding, problem one is that cancel-with-savepoint is not atomic; cancellation happens some time after the checkpoint state is collected, causing undesirable at-least-once behavior or rollback behavior. Maybe this could be solved by enhancing the checkpoint barrier with a termination flag, which would cause the {{cancel}} function to be invoked while the checkpoint synchronization lock is still held. I don't know whether this would play nice with {{CheckpointListener}}. Problem two is the existence of two similar operations, cancel and stop. Stop seems to be aimed at turning an unbounded source into a bounded source. I think it would be awesome to be able to parameterize the stop call w/ connector specifics, e.g. "stop at such-and-such offset". I would hesitate to co-opt the 'stop' functionality / {{StoppableFunction}} to solve problem one. was (Author: eronwright): Can we restate the problem that we're trying to solve? To my understanding, problem one is that cancel-with-savepoint is not atomic; cancellation happens some time after the checkpoint state is collected, causing undesirable at-least-once behavior or rollback behavior. Maybe this could be solved by enhancing the checkpoint barrier with a termination flag, which would cause the cancel function to be invoked while the checkpoint synchronization lock is still held. I don't know whether this would play nice with CheckpointListener. Problem two is the existence of two similar operations, cancel and stop. Stop seems to be aimed at turning an unbounded source into a bounded source. I think it would be awesome to be able to parameterize the stop call w/ connector specifics, e.g. "stop at such-and-such offset". I would hesitate to co-opt the 'stop' functionality / StoppableFunction to solve problem one. > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321463#comment-16321463 ] Eron Wright commented on FLINK-7883: - Can we restate the problem that we're trying to solve? To my understanding, problem one is that cancel-with-savepoint is not atomic; cancellation happens some time after the checkpoint state is collected, causing undesirable at-least-once behavior or rollback behavior. Maybe this could be solved by enhancing the checkpoint barrier with a termination flag, which would cause the cancel function to be invoked while the checkpoint synchronization lock is still held. I don't know whether this would play nice with CheckpointListener. Problem two is the existence of two similar operations, cancel and stop. Stop seems to be aimed at turning an unbounded source into a bounded source. I think it would be awesome to be able to parameterize the stop call w/ connector specifics, e.g. "stop at such-and-such offset". I would hesitate to co-opt the 'stop' functionality / StoppableFunction to solve problem one. > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5627) Allow job specific externalized checkpoint dir
[ https://issues.apache.org/jira/browse/FLINK-5627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321239#comment-16321239 ] Elias Levy commented on FLINK-5627: --- It would be nice to fix this issue. > Allow job specific externalized checkpoint dir > -- > > Key: FLINK-5627 > URL: https://issues.apache.org/jira/browse/FLINK-5627 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Gyula Fora > > Currently the externalized checkpoint directory can only be configured on a > cluster level. This is not in sync with the way checkpoint directories are > generally configured on a job specific level. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4
[ https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321235#comment-16321235 ] ASF GitHub Bot commented on FLINK-8082: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5262 I'm working on a `CHECKLIST` file for `tools/releasing` to codify these several issues that have come up recently. > Bump version compatibility check to 1.4 > --- > > Key: FLINK-8082 > URL: https://issues.apache.org/jira/browse/FLINK-8082 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0 > > > Similar to FLINK-7977, we must bump the version of the compatibility check to > compare 1.5 against 1.4, once it is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5262: [FLINK-8082][build] Bump flink version for japicmp plugin
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5262 I'm working on a `CHECKLIST` file for `tools/releasing` to codify these several issues that have come up recently. ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321159#comment-16321159 ] ASF GitHub Bot commented on FLINK-6951: --- Github user xiatao123 commented on the issue: https://github.com/apache/flink/pull/4150 Got the issue fixed after apply this patch in Release label:emr-5.11.0 Hadoop distribution:Amazon 2.7.3 Applications:Flink 1.3.2 > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user xiatao123 commented on the issue: https://github.com/apache/flink/pull/4150 Got the issue fixed after apply this patch in Release label:emr-5.11.0 Hadoop distribution:Amazon 2.7.3 Applications:Flink 1.3.2 ---
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user xiatao123 commented on the issue: https://github.com/apache/flink/pull/4150 I ran into a similar issue in EMR. Any suggestion on how to fix it? `Release label:emr-5.11.0 Hadoop distribution:Amazon 2.7.3 Applications:Flink 1.3.2` `java.lang.IllegalStateException: Socket not created by this factory at org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435) at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186) at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326) at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) ` ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321104#comment-16321104 ] ASF GitHub Bot commented on FLINK-6951: --- Github user xiatao123 commented on the issue: https://github.com/apache/flink/pull/4150 I ran into a similar issue in EMR. Any suggestion on how to fix it? `Release label:emr-5.11.0 Hadoop distribution:Amazon 2.7.3 Applications:Flink 1.3.2` `java.lang.IllegalStateException: Socket not created by this factory at org.apache.http.util.Asserts.check(Asserts.java:34) at org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435) at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186) at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326) at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) ` > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160795814 --- Diff: docs/concepts/runtime.md --- @@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even if they are subtasks of di they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this *slot sharing* has two main benefits: - - A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. -No need to calculate how many tasks (with varying parallelism) a program contains in total. + - A Flink cluster needs as many task slots as the highest parallelism used in the job. +There's no need to calculate how many tasks (with varying parallelism) a program contains in total. - It is easier to get better resource utilization. Without slot sharing, the non-intensive -*source/map()* subtasks would block as many resources as the resource intensive *window* subtasks. +*source/map()* subtasks would block as many resources as the resource-intensive *window* subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the -slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers. +slotted resources, while making sure that the heavy subtasks are evenly distributed among the TaskManagers. -The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which can be used to prevent undesirable slot sharing. +The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which you can use to prevent undesirable slot sharing. -As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. -With hyper-threading, each slot then takes 2 or more hardware thread contexts. +As a rule-of-thumb, a reasonable default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts. {% top %} ## State Backends -The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state/state_backends.html). One state backend -stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. -In addition to defining the data structure that holds the state, the state backends also implement the logic to -take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. +The exact data structures which store the key/values indexes depends on the chosen [state backend](../ops/state/state_backends.html). One state backend stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. --- End diff -- Preserve the line splits? The conversion to HTML ignores single newlines. ---
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160793612 --- Diff: docs/concepts/runtime.md --- @@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even if they are subtasks of di they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this *slot sharing* has two main benefits: - - A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. -No need to calculate how many tasks (with varying parallelism) a program contains in total. + - A Flink cluster needs as many task slots as the highest parallelism used in the job. +There's no need to calculate how many tasks (with varying parallelism) a program contains in total. - It is easier to get better resource utilization. Without slot sharing, the non-intensive -*source/map()* subtasks would block as many resources as the resource intensive *window* subtasks. +*source/map()* subtasks would block as many resources as the resource-intensive *window* subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the -slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers. +slotted resources, while making sure that the heavy subtasks are evenly distributed among the TaskManagers. -The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which can be used to prevent undesirable slot sharing. +The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which you can use to prevent undesirable slot sharing. -As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. -With hyper-threading, each slot then takes 2 or more hardware thread contexts. +As a rule-of-thumb, a reasonable default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts. {% top %} ## State Backends -The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state/state_backends.html). One state backend -stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. -In addition to defining the data structure that holds the state, the state backends also implement the logic to -take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. +The exact data structures which store the key/values indexes depends on the chosen [state backend](../ops/state/state_backends.html). One state backend stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. {% top %} ## Savepoints -Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. +Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. -[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. +[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution, programs are periodically snapshotted on the worker nodes and produce checkpoints. You only need the last completed checkpoint for recovery, and you can safely discard older checkpoints as soon as a new one is completed. -Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../ops/cli.html#savepoints) or when canc
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160794201 --- Diff: docs/concepts/runtime.md --- @@ -46,19 +46,23 @@ The Flink runtime consists of two types of processes: - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate checkpoints, coordinate recovery on failures, etc. -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +There is always at least one Job Manager. A high-availability setup should have multiple JobManagers, one of --- End diff -- "JobManager"? ---
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160795540 --- Diff: docs/concepts/runtime.md --- @@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even if they are subtasks of di they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this *slot sharing* has two main benefits: - - A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. -No need to calculate how many tasks (with varying parallelism) a program contains in total. + - A Flink cluster needs as many task slots as the highest parallelism used in the job. +There's no need to calculate how many tasks (with varying parallelism) a program contains in total. - It is easier to get better resource utilization. Without slot sharing, the non-intensive -*source/map()* subtasks would block as many resources as the resource intensive *window* subtasks. +*source/map()* subtasks would block as many resources as the resource-intensive *window* subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the -slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers. +slotted resources, while making sure that the heavy subtasks are evenly distributed among the TaskManagers. -The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which can be used to prevent undesirable slot sharing. +The APIs also include a *[resource group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism which you can use to prevent undesirable slot sharing. -As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. -With hyper-threading, each slot then takes 2 or more hardware thread contexts. +As a rule-of-thumb, a reasonable default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts. --- End diff -- Preserve the line split? I'm not sure what the second sentence is saying. Typically hyper-threads are reported as separate cores so would not each slot take a single hardware thread context? ---
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160794027 --- Diff: docs/concepts/runtime.md --- @@ -28,12 +28,12 @@ under the License. ## Tasks and Operator Chains -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. +For distributed execution, Flink *chains* operator subtasks together into *tasks*, with one thread executing each task. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details. +handover and buffering and increases overall throughput while decreasing latency. +You can configure the chaining behavior, read the [chaining docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details. -The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads. +Five subtasks execute the sample data flow in the figure below with five parallel threads. --- End diff -- The community as well as other projects refer to "dataflow" extensively (also below). ---
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r160793754 --- Diff: docs/concepts/runtime.md --- @@ -28,12 +28,12 @@ under the License. ## Tasks and Operator Chains -For distributed execution, Flink *chains* operator subtasks together into *tasks*. Each task is executed by one thread. +For distributed execution, Flink *chains* operator subtasks together into *tasks*, with one thread executing each task. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread -handover and buffering, and increases overall throughput while decreasing latency. -The chaining behavior can be configured; see the [chaining docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details. +handover and buffering and increases overall throughput while decreasing latency. +You can configure the chaining behavior, read the [chaining docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details. --- End diff -- Preserve the semi-colon? ---
[jira] [Commented] (FLINK-8403) Flink Gelly examples hanging without returning result
[ https://issues.apache.org/jira/browse/FLINK-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321032#comment-16321032 ] Greg Hogan commented on FLINK-8403: --- Could this be due to the [{{akka.framesize}}|https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka] limit (default: 10 MiB)? Does the job succeed when you write the output to CSV? > Flink Gelly examples hanging without returning result > - > > Key: FLINK-8403 > URL: https://issues.apache.org/jira/browse/FLINK-8403 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.2 > Environment: CentOS Linux release 7.3.1611 >Reporter: flora karniav > Labels: examples, gelly, performance > Original Estimate: 72h > Remaining Estimate: 72h > > Hello, I am currently running and measuring Flink Gelly examples (Connected > components and Pagerank algorithms) with different SNAP datasets. When > running with the Twitter dataset for example > (https://snap.stanford.edu/data/egonets-Twitter.html) which has 81,306 > vertices everything executes and finishes OK and I get the reported job > runtime. On the other hand, executions with datasets having a bigger number > of vertices, e.g. https://snap.stanford.edu/data/com-Youtube.html with > 1,134,890 vertices, hang with no result and reported time, while at the same > time I get "Job execution switched to status FINISHED." > I thought that this could be a memory issue so I reached 125GB of RAM > assigned to my taskmanagers (and jobmanager), but still no luck. > The exact command I am running is: > ./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm > PageRank --directed false --input_filename hdfs://sith0:9000/user/xx.txt > --input CSV --type integer --input_field_delimiter $' ' --output print -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5279: [hotfix] [build] Print cache info
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5279 [hotfix] [build] Print cache info Print the size of the Maven cache copied for each TravisCI job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 20180110a_print_cache_info Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5279 commit e16005a55c4b4b59e8ba6859fc273af1e48ba8b7 Author: Greg Hogan Date: 2018-01-10T19:07:57Z [hotfix] [build] Print cache info Print the size of the Maven cache copied for each TravisCI job. ---
[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.
[ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marek Barak updated FLINK-8405: --- Description: Hi guys, I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here: https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: {code} import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ import org.junit.{Test, Assert } class SimpleTest extends StreamingMultipleProgramsTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] { var codeList: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { codeList = getRuntimeContext.getMapState( new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int]) ) } override def flatMap1(value: String, out: Collector[Int]): Unit = { val res = if(codeList.contains(value)) codeList.get(value) else 0 out.collect(res) } override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = { codeList.put(value._1, value._2) out.close() } } @Test def job() = { val inputStream = env.fromCollection(List("Some", "Some2", "Some3")) val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3)) inputStream .connect(dictStream.broadcast) .flatMap(StateMap) env.execute() Assert.assertEquals(1, 1) } } {code} I always get the following issue: {code} rg.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153) at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) {code} I guess the main problem is: {code} Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. {code} I also tried: {code} inputStream .keyBy(a => a
[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.
[ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marek Barak updated FLINK-8405: --- Description: Hi guys, I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here: https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: {code} import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ import org.junit.{Test, Assert } class SimpleTest extends StreamingMultipleProgramsTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] { var codeList: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { codeList = getRuntimeContext.getMapState( new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int]) ) } override def flatMap1(value: String, out: Collector[Int]): Unit = { val res = if(codeList.contains(value)) codeList.get(value) else 0 out.collect(res) } override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = { codeList.put(value._1, value._2) out.close() } } @Test def job() = { val inputStream = env.fromCollection(List("Some", "Some2", "Some3")) val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3)) inputStream .connect(dictStream.broadcast) .flatMap(StateMap) env.execute() Assert.assertEquals(1, 1) } } {code} I always get the following issue: {code} rg.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153) at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) {code} I guess the main problem is: {code} Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. {code} I also tried: {code} inputStream .keyBy(a => a
[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.
[ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marek Barak updated FLINK-8405: --- Description: Hi guys, I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here: https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: {code} import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ import org.junit.{Test, Assert } class SimpleTest extends StreamingMultipleProgramsTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] { var codeList: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { codeList = getRuntimeContext.getMapState( new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int]) ) } override def flatMap1(value: String, out: Collector[Int]): Unit = { val res = if(codeList.contains(value)) codeList.get(value) else 0 out.collect(res) } override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = { codeList.put(value._1, value._2) out.close() } } @Test def job() = { val inputStream = env.fromCollection(List("Some", "Some2", "Some3")) val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3)) inputStream .connect(dictStream.broadcast) .flatMap(StateMap) env.execute() Assert.assertEquals(1, 1) } } {code} I always get the following issue: {code} rg.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153) at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) {code} I guess the main problem is: {code} Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. {code} I also tried: {code} inputStream .keyBy(a => a
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320816#comment-16320816 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5248 Thank you for the thorough review. Here is a summary of follow-ups from my side: - Investigate more if we find a better way for configuration than the `ConfigurableStateBackend`. - Adjust some method names. - Possible look at a different name for `CheckpointStorage` to avoid confusion with `CompletedCheckpointStore`. We might actually decide to rename `CompletedCheckpointStore` to `CompletedCheckpointsRegistry`, because that is really what it is. ZK registers the completed checkpoints, it does not store them (after this PR that is more true even than before). One of the bigger comments (to replace String by a stronger typed Pointer class) is something I think will not easily work, because Strings is really what we get from the command line, REST interface, ZK. Any extra wrapping would not add any more safety at this point. When we see that we actually can add more safety, we should refactor this. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5248: [FLINK-5823] [checkpoints] State backends now also handle...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5248 Thank you for the thorough review. Here is a summary of follow-ups from my side: - Investigate more if we find a better way for configuration than the `ConfigurableStateBackend`. - Adjust some method names. - Possible look at a different name for `CheckpointStorage` to avoid confusion with `CompletedCheckpointStore`. We might actually decide to rename `CompletedCheckpointStore` to `CompletedCheckpointsRegistry`, because that is really what it is. ZK registers the completed checkpoints, it does not store them (after this PR that is more true even than before). One of the bigger comments (to replace String by a stronger typed Pointer class) is something I think will not easily work, because Strings is really what we get from the command line, REST interface, ZK. Any extra wrapping would not add any more safety at this point. When we see that we actually can add more safety, we should refactor this. ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320811#comment-16320811 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160763163 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- I think so. Just following the convention of other components (e.g. [kafka consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101)) > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160763163 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- I think so. Just following the convention of other components (e.g. [kafka consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101)) ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320799#comment-16320799 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the State
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ + pub
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320792#comment-16320792 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761001 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStorageLocation; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of durable checkpoint storage to file systems. + */ +public class FsCheckpointStorage extends AbstractFsCheckpointStorage { + + private final FileSystem fileSystem; + + private final Path checkpointsDirectory; + + private final Path sharedStateDirectory; + + private final Path taskOwnedStateDirectory; --- End diff -- We may be able to remove this in the future, but because we need to pull task owned state out of the "exclusive state" directory soon, and the rework to make this use shared state handles is probably more delicate, I would actually have this for now. Should be trivial rework to remove it once we don't need it any more. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320793#comment-16320793 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * State Size Considerations + * + * State checkpointing with this state backend is subject to the following conditions: + * + * Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}. + * + * All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised. + * + * The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space. + * + * + * Persistence Guarantees + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savep
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761001 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStorageLocation; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of durable checkpoint storage to file systems. + */ +public class FsCheckpointStorage extends AbstractFsCheckpointStorage { + + private final FileSystem fileSystem; + + private final Path checkpointsDirectory; + + private final Path sharedStateDirectory; + + private final Path taskOwnedStateDirectory; --- End diff -- We may be able to remove this in the future, but because we need to pull task owned state out of the "exclusive state" directory soon, and the rework to make this use shared state handles is probably more delicate, I would actually have this for now. Should be trivial rework to remove it once we don't need it any more. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * State Size Considerations + * + * State checkpointing with this state backend is subject to the following conditions: + * + * Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}. + * + * All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised. + * + * The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space. + * + * + * Persistence Guarantees + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class MemoryStateBackend extends AbstractStateBackend {
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160760708 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the + * {@code RocksDBStateBackend}. + * + * This class takes the base checkpoint- and savepoint directory paths, but also accepts null + * for both of then, in which case creating externalized checkpoint is not possible, and it is not + * possible to create a savepoint with a default path. Null is accepted to enable implementations + * that only optionally support default savepoints and externalized checkpoints. + * + * Checkpoint Layout + * + * The state backend is configured with a base directory and persists the checkpoint data of specific + * checkpoints in specific subdirectories. For example, if the base directory was set to + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with + * the job's ID that will contain the actual checkpoints: + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b}) + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}. + * + * Savepoint Layout + * + * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data. + * The random digits are added as "entropy" to avoid directory collisions. + * + * Metadata and Success Files + * + * A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'. + * After that is complete (i.e., the file complete), it writes an additional file + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'. + * + * Ideally that would not be necessary, and one would write the metadata file to a temporary file and + * then issue a atomic (or at least constant time) rename. But some of the file systems (like S3) do + * not support that: A rename is a copy process which, when failing, leaves corrupt half written + * files/objects. The success file is hence needed as a signal that the + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete. + */ +@PublicEvolving +public abstract class AbstractFileStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + // + // State Backend Properties + // + + /** The path where checkpoints will be stored, or null, if none has been configured. */ + @Nullable + private final Path baseCheckpoin
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320789#comment-16320789 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160760708 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the + * {@code RocksDBStateBackend}. + * + * This class takes the base checkpoint- and savepoint directory paths, but also accepts null + * for both of then, in which case creating externalized checkpoint is not possible, and it is not + * possible to create a savepoint with a default path. Null is accepted to enable implementations + * that only optionally support default savepoints and externalized checkpoints. + * + * Checkpoint Layout + * + * The state backend is configured with a base directory and persists the checkpoint data of specific + * checkpoints in specific subdirectories. For example, if the base directory was set to + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with + * the job's ID that will contain the actual checkpoints: + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b}) + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}. + * + * Savepoint Layout + * + * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data. + * The random digits are added as "entropy" to avoid directory collisions. + * + * Metadata and Success Files + * + * A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'. + * After that is complete (i.e., the file complete), it writes an additional file + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'. + * + * Ideally that would not be necessary, and one would write the metadata file to a temporary file and + * then issue a atomic (or at least constant time) rename. But some of the file systems (like S3) do + * not support that: A rename is a copy process which, when failing, leaves corrupt half written + * files/objects. The success file is hence needed as a signal that the + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete. + */ +@PublicEvolving +public abstract class AbstractFileStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + // + // State Backend P
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320776#comment-16320776 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160759604 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the State
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160759604 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the StateBackendFactory when creating / configuring the state +* backend in the factory +* @throws IOException +* May be thrown by the StateBackendFactory when instantiating the state backend +*/ + pub
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320771#comment-16320771 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data + // + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- See above, I don't think we get around String. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320769#comment-16320769 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758717 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data --- End diff -- Agreed. In the end we have: - `CheckpointStorage`: Persisting bytes and Metadata. The Stream factories should go there in the next part of the rework. - `KeyedStateBackend`: Holding / snapshotting keyed state - `OperatorStateBackend`: Holding / snapshotting operator state These should be three completely independent interfaces, and this PR moves towards that by introducing the `CheckpointStorage`, even though it does not yet move the Stream Factories there. Once the rework is done, the `StateBackend` just bundles a combination is the three above components (should have only three methods, maybe convenience overload). > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data + // + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- See above, I don't think we get around String. ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160758717 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data --- End diff -- Agreed. In the end we have: - `CheckpointStorage`: Persisting bytes and Metadata. The Stream factories should go there in the next part of the rework. - `KeyedStateBackend`: Holding / snapshotting keyed state - `OperatorStateBackend`: Holding / snapshotting operator state These should be three completely independent interfaces, and this PR moves towards that by introducing the `CheckpointStorage`, even though it does not yet move the Stream Factories there. Once the rework is done, the `StateBackend` just bundles a combination is the three above components (should have only three methods, maybe convenience overload). ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320763#comment-16320763 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160757943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import java.io.IOException; + +/** + * A storage location for one particular checkpoint. This location is typically + * created and initialized via {@link CheckpointStorage#initializeCheckpoint(long)} or + * {@link CheckpointStorage#initializeSavepoint(long, String)}. + */ +public interface CheckpointStorageLocation { + + /** +* Creates the output stream to persist the checkpoint metadata to. +* +* @return The output stream to persist the checkpoint metadata to. +* @throws IOException Thrown, if the stream cannot be opened due to an I/O error. +*/ + CheckpointStateOutputStream createMetadataOutputStream() throws IOException; + + /** +* Finalizes the checkpoint, marking the location as a finished checkpoint. +* This method returns the external checkpoint pointer that can be used to resolve +* the checkpoint upon recovery. +* +* @return The external pointer to the checkpoint at this location. +* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. +*/ + String markCheckpointAsFinished() throws IOException; --- End diff -- I have gone back and forth on this. I felt like keeping it, because we do need to get to some form of external pointer (a String) in the end (for ZK and for external resume). This couples the finalization and obtaining that pointer, which makes sense to me (the pointer may not be producible before finalization). Pushing that into the Metadata's OutputStream needs another interface that creates a "handle and pointer" on closing. I had that in a previous version, it felt much more clumsy. This variant seems nicer to me. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160757943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import java.io.IOException; + +/** + * A storage location for one particular checkpoint. This location is typically + * created and initialized via {@link CheckpointStorage#initializeCheckpoint(long)} or + * {@link CheckpointStorage#initializeSavepoint(long, String)}. + */ +public interface CheckpointStorageLocation { + + /** +* Creates the output stream to persist the checkpoint metadata to. +* +* @return The output stream to persist the checkpoint metadata to. +* @throws IOException Thrown, if the stream cannot be opened due to an I/O error. +*/ + CheckpointStateOutputStream createMetadataOutputStream() throws IOException; + + /** +* Finalizes the checkpoint, marking the location as a finished checkpoint. +* This method returns the external checkpoint pointer that can be used to resolve +* the checkpoint upon recovery. +* +* @return The external pointer to the checkpoint at this location. +* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. +*/ + String markCheckpointAsFinished() throws IOException; --- End diff -- I have gone back and forth on this. I felt like keeping it, because we do need to get to some form of external pointer (a String) in the end (for ZK and for external resume). This couples the finalization and obtaining that pointer, which makes sense to me (the pointer may not be producible before finalization). Pushing that into the Metadata's OutputStream needs another interface that creates a "handle and pointer" on closing. I had that in a previous version, it felt much more clumsy. This variant seems nicer to me. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320756#comment-16320756 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756860 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- I was wondering about that as well. My thinking was that in the end, what we get is just Strings (from the command line, from ZooKeeper, from the REST interface) and there is no point before that method that could parse/convert the string into something state backend specific. So we have to deal with Strings anyways. Wrapping them in another class felt not too useful to me here. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320757#comment-16320757 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; --- End diff -- How about `initializeLocationForCheckpoint`? > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; --- End diff -- How about `initializeLocationForCheckpoint`? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756860 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- I was wondering about that as well. My thinking was that in the end, what we get is just Strings (from the command line, from ZooKeeper, from the REST interface) and there is no point before that method that could parse/convert the string into something state backend specific. So we have to deal with Strings anyways. Wrapping them in another class felt not too useful to me here. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320752#comment-16320752 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756347 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); --- End diff -- Yes, this is part of future work. The high-level description says that: > To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the ZooKeeperCompletedCheckpointStore. The method helps as a sanity check, to allow the checkpoint coordinator to validate that if the `CompletedCheckpointStore` requires durable persistence, then the `CheckpointStore` must provide that. It is used to catch a weird corner case where the ZkCompletedCheckpointStore is used, but the StateStore (from the MemoryStateBackend) does not support actually persisting any data durably. We could remove that, because currently this can never happen (The MemoryStateBackend automatically fills in durable persistence in HA setups) > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160756347 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); --- End diff -- Yes, this is part of future work. The high-level description says that: > To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the ZooKeeperCompletedCheckpointStore. The method helps as a sanity check, to allow the checkpoint coordinator to validate that if the `CompletedCheckpointStore` requires durable persistence, then the `CheckpointStore` must provide that. It is used to catch a weird corner case where the ZkCompletedCheckpointStore is used, but the StateStore (from the MemoryStateBackend) does not support actually persisting any data durably. We could remove that, because currently this can never happen (The MemoryStateBackend automatically fills in durable persistence in HA setups) ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320744#comment-16320744 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160755282 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- The storage will create the stream factories later, that is the plan. Simply did not want to push that in addition into the pull request. This of this pull request as the "JobManager" side of things, the "TaskManager" side of thing is to come next. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160755282 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- The storage will create the stream factories later, that is the plan. Simply did not want to push that in addition into the pull request. This of this pull request as the "JobManager" side of things, the "TaskManager" side of thing is to come next. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320743#comment-16320743 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160754932 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata. + * + * Stored checkpoint metadata files have the following format: + * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)] + * + * The actual savepoint serialization is version-specific via the {@link SavepointSerializer}. + */ +public class Checkpoints { --- End diff -- This class mimics somewhat the way this was done previously, scattered over the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` because the code was relevant not only to savepoints, but to checkpoints in general. It was mainly a bit of name-fixing work as part of the adjustment . Making this pluggable is another orthogonal effort in my opinion, and also a probably more involved one. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160754932 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata. + * + * Stored checkpoint metadata files have the following format: + * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)] + * + * The actual savepoint serialization is version-specific via the {@link SavepointSerializer}. + */ +public class Checkpoints { --- End diff -- This class mimics somewhat the way this was done previously, scattered over the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` because the code was relevant not only to savepoints, but to checkpoints in general. It was mainly a bit of name-fixing work as part of the adjustment . Making this pluggable is another orthogonal effort in my opinion, and also a probably more involved one. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320723#comment-16320723 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1185,6 +1153,10 @@ public int getNumberOfRetainedSuccessfulCheckpoints() { } } + public CheckpointStorage getCheckpointStorage() { --- End diff -- What names would you suggest to use? > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320720#comment-16320720 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -251,77 +225,66 @@ public String toString() { false, false); - private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties( - false, + private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties( false, false, true, - true, - true, - true, - true); + true, // Delete on success + true, // Delete on cancellation + true, // Delete on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties( false, - true, false, true, - true, - false, // Retain on cancellation - false, - false); // Retain on suspension + true, // Delete on success + true, // Delete on cancellation + false, // Retain on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties( false, - true, false, true, - true, - true, // Delete on cancellation - false, - true); // Delete on suspension + true, // Delete on success + false, // Retain on cancellation + false, // Retain on failure + false); // Retain on suspension + /** * Creates the checkpoint properties for a (manually triggered) savepoint. * -* Savepoints are forced and persisted externally. They have to be +* Savepoints are not queued due to time trigger limits. They have to be * garbage collected manually. * * @return Checkpoint properties for a (manually triggered) savepoint. */ - public static CheckpointProperties forStandardSavepoint() { - return STANDARD_SAVEPOINT; - } - - /** -* Creates the checkpoint properties for a regular checkpoint. -* -* Regular checkpoints are not forced and not persisted externally. They -* are garbage collected automatically. -* -* @return Checkpoint properties for a regular checkpoint. -*/ - public static CheckpointProperties forStandardCheckpoint() { - return STANDARD_CHECKPOINT; + public static CheckpointProperties forSavepoint() { + return SAVEPOINT; } /** -* Creates the checkpoint properties for an external checkpoint. +* Creates the checkpoint properties for a checkpoint. * -* External checkpoints are not forced, but persisted externally. They -* are garbage collected automatically, except when the owning job +* Checkpoints may be queued in case too many other checkpoints are currently happening. +* They are garbage collected automatically, except when the owning job * terminates in state {@link JobStatus#FAILED}. The user is required to * configure the clean up behaviour on job cancellation. * -* @param deleteOnCancellation Flag indicating whether to discard on cancellation. -* * @return Checkpoint properties for an external checkpoint. */ - public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - if (deleteOnCancellation) { - return EXTERNALIZED_CHECKPOINT_DELETED; - } else { - return EXTERNALIZED_CHECKPOINT_RETAINED; + pu
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -251,77 +225,66 @@ public String toString() { false, false); - private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties( - false, + private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties( false, false, true, - true, - true, - true, - true); + true, // Delete on success + true, // Delete on cancellation + true, // Delete on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties( false, - true, false, true, - true, - false, // Retain on cancellation - false, - false); // Retain on suspension + true, // Delete on success + true, // Delete on cancellation + false, // Retain on failure + true); // Delete on suspension - private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties( + private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties( false, - true, false, true, - true, - true, // Delete on cancellation - false, - true); // Delete on suspension + true, // Delete on success + false, // Retain on cancellation + false, // Retain on failure + false); // Retain on suspension + /** * Creates the checkpoint properties for a (manually triggered) savepoint. * -* Savepoints are forced and persisted externally. They have to be +* Savepoints are not queued due to time trigger limits. They have to be * garbage collected manually. * * @return Checkpoint properties for a (manually triggered) savepoint. */ - public static CheckpointProperties forStandardSavepoint() { - return STANDARD_SAVEPOINT; - } - - /** -* Creates the checkpoint properties for a regular checkpoint. -* -* Regular checkpoints are not forced and not persisted externally. They -* are garbage collected automatically. -* -* @return Checkpoint properties for a regular checkpoint. -*/ - public static CheckpointProperties forStandardCheckpoint() { - return STANDARD_CHECKPOINT; + public static CheckpointProperties forSavepoint() { + return SAVEPOINT; } /** -* Creates the checkpoint properties for an external checkpoint. +* Creates the checkpoint properties for a checkpoint. * -* External checkpoints are not forced, but persisted externally. They -* are garbage collected automatically, except when the owning job +* Checkpoints may be queued in case too many other checkpoints are currently happening. +* They are garbage collected automatically, except when the owning job * terminates in state {@link JobStatus#FAILED}. The user is required to * configure the clean up behaviour on job cancellation. * -* @param deleteOnCancellation Flag indicating whether to discard on cancellation. -* * @return Checkpoint properties for an external checkpoint. */ - public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - if (deleteOnCancellation) { - return EXTERNALIZED_CHECKPOINT_DELETED; - } else { - return EXTERNALIZED_CHECKPOINT_RETAINED; + public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy policy) { --- End diff -- There is actually a `forSavepoint(...)` method, that's why I think keeping the name makes sense. We could change it to `forCheckpointWithPolic
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160752558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1185,6 +1153,10 @@ public int getNumberOfRetainedSuccessfulCheckpoints() { } } + public CheckpointStorage getCheckpointStorage() { --- End diff -- What names would you suggest to use? ---
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160751897 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Makes sense, let's scope them more generally. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320717#comment-16320717 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160751897 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Makes sense, let's scope them more generally. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6974) Add BIN supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-6974. - Resolution: Fixed Fix Version/s: 1.5.0 Was implemented it as part of FLINK-6893. > Add BIN supported in TableAPI > - > > Key: FLINK-6974 > URL: https://issues.apache.org/jira/browse/FLINK-6974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Labels: starter > Fix For: 1.5.0 > > > See FLINK-6893 for detail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6893) Add BIN supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-6893: Summary: Add BIN supported in SQL & Table API (was: Add BIN supported in SQL) > Add BIN supported in SQL & Table API > > > Key: FLINK-6893 > URL: https://issues.apache.org/jira/browse/FLINK-6893 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.5.0 > > > BIN(N) Returns a string representation of the binary value of N, where N is a > longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if > N is NULL. > * Syntax: > BIN(num) > * Arguments > **num: a long/bigint value > * Return Types > String > * Example: > BIN(12) -> '1100' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6893) Add BIN supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320614#comment-16320614 ] ASF GitHub Bot commented on FLINK-6893: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4128 > Add BIN supported in SQL & Table API > > > Key: FLINK-6893 > URL: https://issues.apache.org/jira/browse/FLINK-6893 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.5.0 > > > BIN(N) Returns a string representation of the binary value of N, where N is a > longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if > N is NULL. > * Syntax: > BIN(num) > * Arguments > **num: a long/bigint value > * Return Types > String > * Example: > BIN(12) -> '1100' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6893) Add BIN supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-6893. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: 2914e596bf2af968197b5241aa40840e2e9408ce & 00ad0eb120026817c79a5fed2c71dd6aa10e9ba6 > Add BIN supported in SQL & Table API > > > Key: FLINK-6893 > URL: https://issues.apache.org/jira/browse/FLINK-6893 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.5.0 > > > BIN(N) Returns a string representation of the binary value of N, where N is a > longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if > N is NULL. > * Syntax: > BIN(num) > * Arguments > **num: a long/bigint value > * Return Types > String > * Example: > BIN(12) -> '1100' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4128: [FLINK-6893][table]Add BIN supported in SQL
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4128 ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320601#comment-16320601 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160727075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data + // + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- Similar to the `String` vs `StoragePointer`, this could also be a class that, essentially , is a `StreamStateHandle` but gives a name to the concept, e.g. `CheckpointMetaDataHandle`. But it is not as bad as the other case, because this is never passed down very deep (yet). > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320596#comment-16320596 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160721205 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; --- End diff -- I would consider renaming this to something like `initializeStorageLocationForCheckpoint(...)` > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320588#comment-16320588 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160719709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; + +import java.io.IOException; + +/** + * A storage location for one particular checkpoint. This location is typically + * created and initialized via {@link CheckpointStorage#initializeCheckpoint(long)} or + * {@link CheckpointStorage#initializeSavepoint(long, String)}. + */ +public interface CheckpointStorageLocation { + + /** +* Creates the output stream to persist the checkpoint metadata to. +* +* @return The output stream to persist the checkpoint metadata to. +* @throws IOException Thrown, if the stream cannot be opened due to an I/O error. +*/ + CheckpointStateOutputStream createMetadataOutputStream() throws IOException; + + /** +* Finalizes the checkpoint, marking the location as a finished checkpoint. +* This method returns the external checkpoint pointer that can be used to resolve +* the checkpoint upon recovery. +* +* @return The external pointer to the checkpoint at this location. +* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. +*/ + String markCheckpointAsFinished() throws IOException; --- End diff -- As discussed, you might reconsider if this method is really needed. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320589#comment-16320589 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160690515 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the St
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320593#comment-16320593 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160716512 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1185,6 +1153,10 @@ public int getNumberOfRetainedSuccessfulCheckpoints() { } } + public CheckpointStorage getCheckpointStorage() { --- End diff -- Here I noticed how similar the names `CheckpointStorage` and `CompletedCheckpointStore` are. Do you think there are names that make their difference more obvious when reading the code? For example, the second could also go well under `CompletedCheckpoints`. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320598#comment-16320598 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160730627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStorageLocation; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of durable checkpoint storage to file systems. + */ +public class FsCheckpointStorage extends AbstractFsCheckpointStorage { + + private final FileSystem fileSystem; + + private final Path checkpointsDirectory; + + private final Path sharedStateDirectory; + + private final Path taskOwnedStateDirectory; --- End diff -- As discussed, we might not need this if we map task-owned state to shared state. Idea: as long as the task still requires the state, it will resend the handle to this file (a placeholder to be precise) and keep the shared registry's reference count above zero. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320595#comment-16320595 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160638882 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * State Size Considerations + * + * State checkpointing with this state backend is subject to the following conditions: + * + * Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}. + * + * All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised. + * + * The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space. + * + * + * Persistence Guarantees + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default sa
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320599#comment-16320599 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160727468 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + + /** +* Initializes a storage location for new checkpoint with the given ID. +* +* The returned storage location can be used to write the checkpoint data and metadata +* to and to obtain the pointers for the location(s) where the actual checkpoint data should be +* stored. +* +* @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted. +* @return A storage location for the data and metadata of the given checkpoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/ + CheckpointStorageLocation initializeCheckpoint(long checkpointId) throws IOException; + + /** +* Initializes a storage location for new savepoint with the given ID. +* +* If an external location pointer is passed, the savepoint storage location +* will be initialized at the location of that pointer. If the external location pointer is null, +* the default savepoint location will be used. If no default savepoint location is configured, +* this will throw an exception. Whether a default savepoint location is configured can be +* checked via {@link #hasDefaultSavepointLocation()}. +* +* @param checkpointId The ID (logical timestamp) of the savepoint's checkpoint. +* @param externalLocationPointer Optionally, a pointer to the location where the savepoint should +*be stored. May be null. +* +* @return A storage location for the data and metadata of the savepoint. +* +* @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception. +*/
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320591#comment-16320591 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160698030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata. + * + * Stored checkpoint metadata files have the following format: + * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)] + * + * The actual savepoint serialization is version-specific via the {@link SavepointSerializer}. + */ +public class Checkpoints { --- End diff -- This class contains a lot of static methods that somehow look like they want to belong to a checkpoint/savepoint store object. For the sake of keeping those part replaceable for unit tests, I wonder if we should eliminate the static nature of this code? > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320581#comment-16320581 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160646300 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the + * {@code RocksDBStateBackend}. + * + * This class takes the base checkpoint- and savepoint directory paths, but also accepts null + * for both of then, in which case creating externalized checkpoint is not possible, and it is not + * possible to create a savepoint with a default path. Null is accepted to enable implementations + * that only optionally support default savepoints and externalized checkpoints. + * + * Checkpoint Layout + * + * The state backend is configured with a base directory and persists the checkpoint data of specific + * checkpoints in specific subdirectories. For example, if the base directory was set to + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with + * the job's ID that will contain the actual checkpoints: + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b}) + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}. + * + * Savepoint Layout + * + * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data. + * The random digits are added as "entropy" to avoid directory collisions. + * + * Metadata and Success Files + * + * A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'. + * After that is complete (i.e., the file complete), it writes an additional file + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'. + * + * Ideally that would not be necessary, and one would write the metadata file to a temporary file and + * then issue a atomic (or at least constant time) rename. But some of the file systems (like S3) do + * not support that: A rename is a copy process which, when failing, leaves corrupt half written + * files/objects. The success file is hence needed as a signal that the + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete. + */ +@PublicEvolving +public abstract class AbstractFileStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + // + // State Backen
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320587#comment-16320587 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160690634 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the St
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320592#comment-16320592 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160718229 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- I think `resolveStoragePointerForCheckpoint(...)` or similar might be a better name. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320583#comment-16320583 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160686745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the St
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320597#comment-16320597 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160725400 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { --- End diff -- Another point about naming that I found confusing: Backends have a methods to create `CheckpointStorage` and `CheckpointStreamFactory`. From the names, it sounds like the storage could also do the job of the stream factories. In fact, the storage is dealing more with the metadata aspect on the checkpoint coordinator (which we call `Checkpoint`), and the factories are giving as streams that are used in checkpointing (so far, so good), but to write the snapshots on the task managers. Maybe the problem is rooted in that checkpoint is some metadata class for the JM, as well as some concept - and in the method current names both get mixed up. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320600#comment-16320600 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160731923 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); --- End diff -- The purpose of this did not become fully clear for me from the comments and the method is also never used. If this is part of some future work, you could consider adding the method in a future PR. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320590#comment-16320590 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160687956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class contains utility methods to load state backends from configurations. + */ +public class StateBackendLoader { + + // + // Configuration shortcut names + // + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // + // Loading the state backend from a configuration + // + + /** +* Loads the state backend from the configuration, from the parameter 'state.backend', as defined +* in {@link CheckpointingOptions#STATE_BACKEND}. +* +* The state backends can be specified either via their shortcut name, or via the class name +* of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory +* is instantiated (via its zero-argument constructor) and its +* {@link StateBackendFactory#createFromConfig(Configuration)} method is called. +* +* Recognized shortcut names are '{@value StateBackendLoader#MEMORY_STATE_BACKEND_NAME}', +* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and +* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'. +* +* @param config The configuration to load the state backend from +* @param classLoader The class loader that should be used to load the state backend +* @param logger Optionally, a logger to log actions to (may be null) +* +* @return The instantiated state backend. +* +* @throws DynamicCodeLoadingException +* Thrown if a state backend factory is configured and the factory class was not +* found or the factory could not be instantiated +* @throws IllegalConfigurationException +* May be thrown by the St
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320594#comment-16320594 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160722748 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -475,13 +474,12 @@ public void enableCheckpointing( checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, - externalizeSettings, + retentionPolicy, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, --- End diff -- Here is one example where the proximity between the classnames `CompletedCheckpointStore` and `CheckpointStorage` becomes obvious. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320584#comment-16320584 ] ASF GitHub Bot commented on FLINK-5823: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160644877 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -84,13 +84,44 @@ public interface StateBackend extends java.io.Serializable { // - // Persistent Bytes Storage + // Checkpoint storage - the durable persistence of checkpoint data --- End diff -- The interface `StateBackend` is getting more crowded and more functionality is added and all methods are visible to everybody from `CheckpointCoordinator` down to the `XYZKeyedStateBackend`. You have already divided the interface into 3 segments and I suggest to actually split it into 3 parent interfaces, which are all implemented by `StateBackend`. In many places, we can just pass by the interface of the sub-functionality that is relevant, e.g. `XYZKeyedStateBackend` probably only needs to see to part about creating streams and should never call things that create a new backend. This would be a big step for separation of concerns, to move away from "global visibility". Our IDE should be able to automatically figure out all the places where the sub-interfaces are sufficient, but as it might be a bigger diff, we could do a separate PR for this. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160718388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. + * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, + * created by this class. + */ +public interface CheckpointStorage { + + /** +* Checks whether this backend supports highly available storage of data. +* +* Some state backends may offer support for that with default settings, which makes them +* suitable for zero-config prototyping, but not for actual production setups. +*/ + boolean supportsHighlyAvailableStorage(); + + /** +* Checks whether the storage has a default savepoint location configured. +*/ + boolean hasDefaultSavepointLocation(); + + /** +* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the +* checkpoint metadata can be read. If the state backend cannot understand the format of +* the pointer (for example because it was created by a different state backend) this method +* should throw an {@code IOException}. +* +* @param pointer The pointer to resolve. +* @return The state handler from which one can read the checkpoint metadata. +* +* @throws IOException Thrown, if the state backend does not understand the pointer, or if +* the pointer could not be resolved due to an I/O error. +*/ + StreamStateHandle resolveCheckpoint(String pointer) throws IOException; --- End diff -- Furthermore, I suggest to introduce a separate class like `(Checkpoint)StoragePointer` instead of plain `String`. Internally, the class can be really just a string, but I find it very helpful for reading to code if there is a meaningful classname attached to a concept instead of raw String usage. Also helps to avoid autocomplete mistakes for methods that take a pointer plus other strings. ---