[GitHub] flink pull request #5280: Fix typo in AbstractMetricGroup.java

2018-01-10 Thread maqingxiang
GitHub user maqingxiang opened a pull request:

https://github.com/apache/flink/pull/5280

Fix typo in AbstractMetricGroup.java



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maqingxiang/flink 
fix-typo-in-AbstractMetricGroup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5280


commit 3c568e20bba53bf7ff675d230a3076e495d70639
Author: maqingxiang-it 
Date:   2018-01-11T05:44:04Z

Fix typo in AbstractMetricGroup.java




---


[jira] [Comment Edited] (FLINK-8327) ClassLoader resolution of child-first does not appear to work

2018-01-10 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321642#comment-16321642
 ] 

Raymond Tay edited comment on FLINK-8327 at 1/11/18 3:34 AM:
-

Closing this as i have to dynamic classloading with {{http4s}} client for Scala 
and i think its going to be a problem for implementers of _rich_ sources, 
mappers and sinks which is likely to leverage 3rd party libraries if the 
developer has to manage the dependencies s.t. its always found in the {{/lib}} 
directory


was (Author: raymondtay):
Closing this as i have to dynamic classloading with {{http4s}} client for Scala 
and i think its going to be a problem for implementers of _rich_ sources, 
mappers and sinks if the developer has to manage the dependencies s.t. its 
always found in the {{/lib}} directory

> ClassLoader resolution of child-first does not appear to work
> -
>
> Key: FLINK-8327
> URL: https://issues.apache.org/jira/browse/FLINK-8327
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: h3. Environment
> * Local flink cluster version 1.4.0
> * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
> * scala 2.11.11
> * oracle jdk 1.8.0
> h3. Library version
> * akka actors 2.4.20
> * akka http 10.0.10
>Reporter: Raymond Tay
>
> h2. Description
> Was trying out the {{Queryable State}} and ran into a problem where the 
> submitted job starts regular Akka actors and making external HTTP calls via 
> {{akka-http}} libraries and the flink runtime was complaining that it was not 
> able to read the key {{akka.http}} (this key is held in the configuration 
> file for {{akka-http}}).
> When i ran our app on the {{sbt}} shell locally, it was able to see the key 
> {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to 
> flink, it was throwing the error message (see below). Looks like a class 
> loader issue but i'm not sure.
> Any help is much appreciated !
> h2. Error message
> {noformat}
> Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
> setting found for key 'akka.http'
>   at 
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
>   at 
> com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258)
>   at 
> com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264)
>   at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:843)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:719)
>   at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917)
>   at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>   at akka.http.scaladsl.Http$.apply(Http.scala:838)
>   at akka.http.scaladsl.Http$.apply(Http.scala:719)
>   at org.example.state.A.(Simple.scala:158)
>   ... 18 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8327) ClassLoader resolution of child-first does not appear to work

2018-01-10 Thread Raymond Tay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Tay resolved FLINK-8327.

Resolution: Information Provided

> ClassLoader resolution of child-first does not appear to work
> -
>
> Key: FLINK-8327
> URL: https://issues.apache.org/jira/browse/FLINK-8327
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: h3. Environment
> * Local flink cluster version 1.4.0
> * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
> * scala 2.11.11
> * oracle jdk 1.8.0
> h3. Library version
> * akka actors 2.4.20
> * akka http 10.0.10
>Reporter: Raymond Tay
>
> h2. Description
> Was trying out the {{Queryable State}} and ran into a problem where the 
> submitted job starts regular Akka actors and making external HTTP calls via 
> {{akka-http}} libraries and the flink runtime was complaining that it was not 
> able to read the key {{akka.http}} (this key is held in the configuration 
> file for {{akka-http}}).
> When i ran our app on the {{sbt}} shell locally, it was able to see the key 
> {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to 
> flink, it was throwing the error message (see below). Looks like a class 
> loader issue but i'm not sure.
> Any help is much appreciated !
> h2. Error message
> {noformat}
> Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
> setting found for key 'akka.http'
>   at 
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
>   at 
> com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258)
>   at 
> com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264)
>   at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:843)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:719)
>   at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917)
>   at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>   at akka.http.scaladsl.Http$.apply(Http.scala:838)
>   at akka.http.scaladsl.Http$.apply(Http.scala:719)
>   at org.example.state.A.(Simple.scala:158)
>   ... 18 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work

2018-01-10 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321642#comment-16321642
 ] 

Raymond Tay commented on FLINK-8327:


Closing this as i have to dynamic classloading with {{http4s}} client for Scala 
and i think its going to be a problem for implementers of _rich_ sources, 
mappers and sinks if the developer has to manage the dependencies s.t. its 
always found in the {{/lib}} directory

> ClassLoader resolution of child-first does not appear to work
> -
>
> Key: FLINK-8327
> URL: https://issues.apache.org/jira/browse/FLINK-8327
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: h3. Environment
> * Local flink cluster version 1.4.0
> * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
> * scala 2.11.11
> * oracle jdk 1.8.0
> h3. Library version
> * akka actors 2.4.20
> * akka http 10.0.10
>Reporter: Raymond Tay
>
> h2. Description
> Was trying out the {{Queryable State}} and ran into a problem where the 
> submitted job starts regular Akka actors and making external HTTP calls via 
> {{akka-http}} libraries and the flink runtime was complaining that it was not 
> able to read the key {{akka.http}} (this key is held in the configuration 
> file for {{akka-http}}).
> When i ran our app on the {{sbt}} shell locally, it was able to see the key 
> {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to 
> flink, it was throwing the error message (see below). Looks like a class 
> loader issue but i'm not sure.
> Any help is much appreciated !
> h2. Error message
> {noformat}
> Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
> setting found for key 'akka.http'
>   at 
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
>   at 
> com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258)
>   at 
> com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264)
>   at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:843)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:719)
>   at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917)
>   at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>   at akka.http.scaladsl.Http$.apply(Http.scala:838)
>   at akka.http.scaladsl.Http$.apply(Http.scala:719)
>   at org.example.state.A.(Simple.scala:158)
>   ... 18 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-10 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo @tzulitai sorry, I had a long day yesterday and misunderstood 
what you mean.

And thank you @casidiablo @xiatao123 both for confirming it works.


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321616#comment-16321616
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4150
  
@casidiablo @tzulitai sorry, I had a long day yesterday and misunderstood 
what you mean.

And thank you @casidiablo @xiatao123 both for confirming it works.


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321558#comment-16321558
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849525
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

yes, it should release all readers here.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849525
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

yes, it should release all readers here.


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321557#comment-16321557
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

agree


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

agree


---


[jira] [Commented] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321536#comment-16321536
 ] 

ASF GitHub Bot commented on FLINK-8324:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5214
  
Thanks for the PR @tony810430. The changes LGTM, will merge this a bit 
later.


> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5214: [FLINK-8324] [kafka] Expose another offsets metrics by us...

2018-01-10 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5214
  
Thanks for the PR @tony810430. The changes LGTM, will merge this a bit 
later.


---


[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2018-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321533#comment-16321533
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6109:


Thanks for picking the issue up [~aitozi]! Of course, contributions are always 
welcomed.

I have a few points I would like to discuss:

1. I've thought a bit about exposing "lag" as a metric directly in the Kafka 
consumer, but it seems like it could be a bit out-of-scope for the consumer 
itself. For the consumer to calculate this lag, it would essentially need to 
query Kafka the current head offset, which can be very overwhelming.

2. Because of that, I suggest that we only add a `checkpointedOffset` metric, 
scoped by topic and partition id. We already have a `currentOffsets` and 
`committedOffsets` metric, so adding that would be a good addition overall, 
IMO. The "lag" should be determined by the user in conjuction with other Kafka 
tools.

3. You can perhaps build your work on top of 
https://github.com/apache/flink/pull/5214, which I'll be merging soon, to avoid 
any conflicts.

I would like this new metric to be added for the 1.5.0 release, if possible. 
How are you doing currently with the feature?

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: aitozi
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8410) Kafka consumer's commitedOffsets gauge metric is prematurely set

2018-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8410:
--

 Summary: Kafka consumer's commitedOffsets gauge metric is 
prematurely set
 Key: FLINK-8410
 URL: https://issues.apache.org/jira/browse/FLINK-8410
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Metrics
Affects Versions: 1.3.2, 1.4.0, 1.5.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.3.3, 1.5.0, 1.4.1


The {{committedOffset}} metric gauge value is set too early. It is set here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L236

This sets the committed offset before the actual commit happens, which varies 
depending on whether the commit mode is auto periodically, or committed on 
checkpoints. Moreover, in the 0.9+ consumers, the {{KafkaConsumerThread}} may 
choose to supersede some commit attempts if the commit takes longer than the 
commit interval.

While the committed offset back to Kafka is not a critical value used by the 
consumer, it will be best to have more accurate values as a Flink-shipped 
metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2018-01-10 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841610
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

Great idea.


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321497#comment-16321497
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841610
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

Great idea.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321495#comment-16321495
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841457
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Ah I see, that makes sense then.

Could you then name it as `KinesisConsumer`, to be more consistent?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321494#comment-16321494
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841254
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

We probably should also add a `stream` group before `shardId`, since the 
Kinesis consumer allows subscribing to multiple streams.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2018-01-10 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841457
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Ah I see, that makes sense then.

Could you then name it as `KinesisConsumer`, to be more consistent?


---


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2018-01-10 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841254
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

We probably should also add a `stream` group before `shardId`, since the 
Kinesis consumer allows subscribing to multiple streams.


---


[jira] [Comment Edited] (FLINK-7883) Stop fetching source before a cancel with savepoint

2018-01-10 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321463#comment-16321463
 ] 

Eron Wright  edited comment on FLINK-7883 at 1/11/18 12:45 AM:
---

Can we restate the problem that we're trying to solve?   To my understanding, 
problem one is that cancel-with-savepoint is not atomic; cancellation happens 
some time after the checkpoint state is collected, causing undesirable 
at-least-once behavior or rollback behavior.  Maybe this could be solved by 
enhancing the checkpoint barrier with a termination flag, which would cause the 
{{cancel}} function to be invoked while the checkpoint synchronization lock is 
still held.  I don't know whether this would play nice with 
{{CheckpointListener}}.

Problem two is the existence of two similar operations, cancel and stop.  Stop 
seems to be aimed at turning an unbounded source into a bounded source.  I 
think it would be awesome to be able to parameterize the stop call w/ connector 
specifics, e.g. "stop at such-and-such offset".  I would hesitate to co-opt the 
'stop' functionality / {{StoppableFunction}} to solve problem one.


was (Author: eronwright):
Can we restate the problem that we're trying to solve?   To my understanding, 
problem one is that cancel-with-savepoint is not atomic; cancellation happens 
some time after the checkpoint state is collected, causing undesirable 
at-least-once behavior or rollback behavior.  Maybe this could be solved by 
enhancing the checkpoint barrier with a termination flag, which would cause the 
cancel function to be invoked while the checkpoint synchronization lock is 
still held.  I don't know whether this would play nice with CheckpointListener.

Problem two is the existence of two similar operations, cancel and stop.  Stop 
seems to be aimed at turning an unbounded source into a bounded source.  I 
think it would be awesome to be able to parameterize the stop call w/ connector 
specifics, e.g. "stop at such-and-such offset".  I would hesitate to co-opt the 
'stop' functionality / StoppableFunction to solve problem one.

> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

2018-01-10 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321463#comment-16321463
 ] 

Eron Wright  commented on FLINK-7883:
-

Can we restate the problem that we're trying to solve?   To my understanding, 
problem one is that cancel-with-savepoint is not atomic; cancellation happens 
some time after the checkpoint state is collected, causing undesirable 
at-least-once behavior or rollback behavior.  Maybe this could be solved by 
enhancing the checkpoint barrier with a termination flag, which would cause the 
cancel function to be invoked while the checkpoint synchronization lock is 
still held.  I don't know whether this would play nice with CheckpointListener.

Problem two is the existence of two similar operations, cancel and stop.  Stop 
seems to be aimed at turning an unbounded source into a bounded source.  I 
think it would be awesome to be able to parameterize the stop call w/ connector 
specifics, e.g. "stop at such-and-such offset".  I would hesitate to co-opt the 
'stop' functionality / StoppableFunction to solve problem one.

> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5627) Allow job specific externalized checkpoint dir

2018-01-10 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321239#comment-16321239
 ] 

Elias Levy commented on FLINK-5627:
---

It would be nice to fix this issue.

> Allow job specific externalized checkpoint dir
> --
>
> Key: FLINK-5627
> URL: https://issues.apache.org/jira/browse/FLINK-5627
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Gyula Fora
>
> Currently the externalized checkpoint directory can only be configured on a 
> cluster level. This is not in sync with the way checkpoint directories are 
> generally configured on a job specific level.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321235#comment-16321235
 ] 

ASF GitHub Bot commented on FLINK-8082:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5262
  
I'm working on a `CHECKLIST` file for `tools/releasing` to codify these 
several issues that have come up recently.


> Bump version compatibility check to 1.4
> ---
>
> Key: FLINK-8082
> URL: https://issues.apache.org/jira/browse/FLINK-8082
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>
> Similar to FLINK-7977, we must bump the version of the compatibility check to 
> compare 1.5 against 1.4, once it is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5262: [FLINK-8082][build] Bump flink version for japicmp plugin

2018-01-10 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5262
  
I'm working on a `CHECKLIST` file for `tools/releasing` to codify these 
several issues that have come up recently.


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321159#comment-16321159
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user xiatao123 commented on the issue:

https://github.com/apache/flink/pull/4150
  
Got the issue fixed after apply this patch in 
Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2



> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-10 Thread xiatao123
Github user xiatao123 commented on the issue:

https://github.com/apache/flink/pull/4150
  
Got the issue fixed after apply this patch in 
Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2



---


[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2018-01-10 Thread xiatao123
Github user xiatao123 commented on the issue:

https://github.com/apache/flink/pull/4150
  
I ran into a similar issue in EMR. Any suggestion on how to fix it?

`Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2`

`java.lang.IllegalStateException: Socket not created by this factory
at org.apache.http.util.Asserts.check(Asserts.java:34)
at 
org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435)
at 
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186)
at 
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326)
at 
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
at 
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
`


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321104#comment-16321104
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user xiatao123 commented on the issue:

https://github.com/apache/flink/pull/4150
  
I ran into a similar issue in EMR. Any suggestion on how to fix it?

`Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2`

`java.lang.IllegalStateException: Socket not created by this factory
at org.apache.http.util.Asserts.check(Asserts.java:34)
at 
org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435)
at 
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186)
at 
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326)
at 
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
at 
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
`


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160795814
  
--- Diff: docs/concepts/runtime.md ---
@@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even 
if they are subtasks of di
 they are from the same job. The result is that one slot may hold an entire 
pipeline of the
 job. Allowing this *slot sharing* has two main benefits:
 
-  - A Flink cluster needs exactly as many task slots as the highest 
parallelism used in the job.
-No need to calculate how many tasks (with varying parallelism) a 
program contains in total.
+  - A Flink cluster needs as many task slots as the highest parallelism 
used in the job.
+There's no need to calculate how many tasks (with varying parallelism) 
a program contains in total.
 
   - It is easier to get better resource utilization. Without slot sharing, 
the non-intensive
-*source/map()* subtasks would block as many resources as the resource 
intensive *window* subtasks.
+*source/map()* subtasks would block as many resources as the 
resource-intensive *window* subtasks.
 With slot sharing, increasing the base parallelism in our example from 
two to six yields full utilization of the
-slotted resources, while making sure that the heavy subtasks are 
fairly distributed among the TaskManagers.
+slotted resources, while making sure that the heavy subtasks are 
evenly distributed among the TaskManagers.
 
 
 
-The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which can be used to prevent undesirable slot sharing. 
+The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which you can use to prevent undesirable slot sharing.
 
-As a rule-of-thumb, a good default number of task slots would be the 
number of CPU cores.
-With hyper-threading, each slot then takes 2 or more hardware thread 
contexts.
+As a rule-of-thumb, a reasonable default number of task slots would be the 
number of CPU cores. With hyper-threading, each slot then takes 2 or more 
hardware thread contexts.
 
 {% top %}
 
 ## State Backends
 
-The exact data structures in which the key/values indexes are stored 
depends on the chosen [state backend](../ops/state/state_backends.html). One 
state backend
-stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store.
-In addition to defining the data structure that holds the state, the state 
backends also implement the logic to
-take a point-in-time snapshot of the key/value state and store that 
snapshot as part of a checkpoint.
+The exact data structures which store the key/values indexes depends on 
the chosen [state backend](../ops/state/state_backends.html). One state backend 
stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store. In addition to defining 
the data structure that holds the state, the state backends also implement the 
logic to take a point-in-time snapshot of the key/value state and store that 
snapshot as part of a checkpoint.
--- End diff --

Preserve the line splits? The conversion to HTML ignores single newlines.


---


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160793612
  
--- Diff: docs/concepts/runtime.md ---
@@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even 
if they are subtasks of di
 they are from the same job. The result is that one slot may hold an entire 
pipeline of the
 job. Allowing this *slot sharing* has two main benefits:
 
-  - A Flink cluster needs exactly as many task slots as the highest 
parallelism used in the job.
-No need to calculate how many tasks (with varying parallelism) a 
program contains in total.
+  - A Flink cluster needs as many task slots as the highest parallelism 
used in the job.
+There's no need to calculate how many tasks (with varying parallelism) 
a program contains in total.
 
   - It is easier to get better resource utilization. Without slot sharing, 
the non-intensive
-*source/map()* subtasks would block as many resources as the resource 
intensive *window* subtasks.
+*source/map()* subtasks would block as many resources as the 
resource-intensive *window* subtasks.
 With slot sharing, increasing the base parallelism in our example from 
two to six yields full utilization of the
-slotted resources, while making sure that the heavy subtasks are 
fairly distributed among the TaskManagers.
+slotted resources, while making sure that the heavy subtasks are 
evenly distributed among the TaskManagers.
 
 
 
-The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which can be used to prevent undesirable slot sharing. 
+The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which you can use to prevent undesirable slot sharing.
 
-As a rule-of-thumb, a good default number of task slots would be the 
number of CPU cores.
-With hyper-threading, each slot then takes 2 or more hardware thread 
contexts.
+As a rule-of-thumb, a reasonable default number of task slots would be the 
number of CPU cores. With hyper-threading, each slot then takes 2 or more 
hardware thread contexts.
 
 {% top %}
 
 ## State Backends
 
-The exact data structures in which the key/values indexes are stored 
depends on the chosen [state backend](../ops/state/state_backends.html). One 
state backend
-stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store.
-In addition to defining the data structure that holds the state, the state 
backends also implement the logic to
-take a point-in-time snapshot of the key/value state and store that 
snapshot as part of a checkpoint.
+The exact data structures which store the key/values indexes depends on 
the chosen [state backend](../ops/state/state_backends.html). One state backend 
stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store. In addition to defining 
the data structure that holds the state, the state backends also implement the 
logic to take a point-in-time snapshot of the key/value state and store that 
snapshot as part of a checkpoint.
 
 
 
 {% top %}
 
 ## Savepoints
 
-Programs written in the Data Stream API can resume execution from a 
**savepoint**. Savepoints allow both updating your programs and your Flink 
cluster without losing any state. 
+Programs written in the Data Stream API can resume execution from a 
**savepoint**. Savepoints allow both updating your programs and your Flink 
cluster without losing any state.
 
-[Savepoints](../ops/state/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution programs are periodically snapshotted on the worker nodes and produce 
checkpoints. For recovery only the last completed checkpoint is needed and 
older checkpoints can be safely discarded as soon as a new one is completed.
+[Savepoints](../ops/state/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution, programs are periodically snapshotted on the worker nodes and 
produce checkpoints. You only need the last completed checkpoint for recovery, 
and you can safely discard older checkpoints as soon as a new one is completed.
 
-Savepoints are similar to these periodic checkpoints except that they are 
**triggered by the user** and **don't automatically expire** when newer 
checkpoints are completed. Savepoints can be created from the [command 
line](../ops/cli.html#savepoints) or when canc

[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160794201
  
--- Diff: docs/concepts/runtime.md ---
@@ -46,19 +46,23 @@ The Flink runtime consists of two types of processes:
   - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
 checkpoints, coordinate recovery on failures, etc.
 
-There is always at least one Job Manager. A high-availability setup 
will have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+There is always at least one Job Manager. A high-availability setup 
should have multiple JobManagers, one of
--- End diff --

"JobManager"?


---


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160795540
  
--- Diff: docs/concepts/runtime.md ---
@@ -88,40 +94,36 @@ By default, Flink allows subtasks to share slots even 
if they are subtasks of di
 they are from the same job. The result is that one slot may hold an entire 
pipeline of the
 job. Allowing this *slot sharing* has two main benefits:
 
-  - A Flink cluster needs exactly as many task slots as the highest 
parallelism used in the job.
-No need to calculate how many tasks (with varying parallelism) a 
program contains in total.
+  - A Flink cluster needs as many task slots as the highest parallelism 
used in the job.
+There's no need to calculate how many tasks (with varying parallelism) 
a program contains in total.
 
   - It is easier to get better resource utilization. Without slot sharing, 
the non-intensive
-*source/map()* subtasks would block as many resources as the resource 
intensive *window* subtasks.
+*source/map()* subtasks would block as many resources as the 
resource-intensive *window* subtasks.
 With slot sharing, increasing the base parallelism in our example from 
two to six yields full utilization of the
-slotted resources, while making sure that the heavy subtasks are 
fairly distributed among the TaskManagers.
+slotted resources, while making sure that the heavy subtasks are 
evenly distributed among the TaskManagers.
 
 
 
-The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which can be used to prevent undesirable slot sharing. 
+The APIs also include a *[resource 
group](../dev/datastream_api.html#task-chaining-and-resource-groups)* mechanism 
which you can use to prevent undesirable slot sharing.
 
-As a rule-of-thumb, a good default number of task slots would be the 
number of CPU cores.
-With hyper-threading, each slot then takes 2 or more hardware thread 
contexts.
+As a rule-of-thumb, a reasonable default number of task slots would be the 
number of CPU cores. With hyper-threading, each slot then takes 2 or more 
hardware thread contexts.
--- End diff --

Preserve the line split? I'm not sure what the second sentence is saying. 
Typically hyper-threads are reported as separate cores so would not each slot 
take a single hardware thread context?


---


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160794027
  
--- Diff: docs/concepts/runtime.md ---
@@ -28,12 +28,12 @@ under the License.
 
 ## Tasks and Operator Chains
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
+For distributed execution, Flink *chains* operator subtasks together into 
*tasks*, with one thread executing each task.
 Chaining operators together into tasks is a useful optimization: it 
reduces the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details.
+handover and buffering and increases overall throughput while decreasing 
latency.
+You can configure the chaining behavior, read the [chaining 
docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details.
 
-The sample dataflow in the figure below is executed with five subtasks, 
and hence with five parallel threads.
+Five subtasks execute the sample data flow in the figure below with five 
parallel threads.
--- End diff --

The community as well as other projects refer to "dataflow" extensively 
(also below).


---


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r160793754
  
--- Diff: docs/concepts/runtime.md ---
@@ -28,12 +28,12 @@ under the License.
 
 ## Tasks and Operator Chains
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
+For distributed execution, Flink *chains* operator subtasks together into 
*tasks*, with one thread executing each task.
 Chaining operators together into tasks is a useful optimization: it 
reduces the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details.
+handover and buffering and increases overall throughput while decreasing 
latency.
+You can configure the chaining behavior, read the [chaining 
docs](../dev/datastream_api.html#task-chaining-and-resource-groups) for details.
--- End diff --

Preserve the semi-colon?


---


[jira] [Commented] (FLINK-8403) Flink Gelly examples hanging without returning result

2018-01-10 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321032#comment-16321032
 ] 

Greg Hogan commented on FLINK-8403:
---

Could this be due to the 
[{{akka.framesize}}|https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka]
 limit (default: 10 MiB)? Does the job succeed when you write the output to CSV?

> Flink Gelly examples hanging without returning result
> -
>
> Key: FLINK-8403
> URL: https://issues.apache.org/jira/browse/FLINK-8403
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.2
> Environment: CentOS Linux release 7.3.1611
>Reporter: flora karniav
>  Labels: examples, gelly, performance
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Hello, I am currently running and measuring Flink Gelly examples (Connected 
> components and Pagerank algorithms) with different SNAP datasets. When 
> running with the Twitter dataset for example 
> (https://snap.stanford.edu/data/egonets-Twitter.html) which has 81,306 
> vertices everything executes and finishes OK and I get the reported job 
> runtime. On the other hand,  executions with datasets having a bigger number 
> of vertices, e.g. https://snap.stanford.edu/data/com-Youtube.html with 
> 1,134,890 vertices, hang with no result and reported time, while at the same 
> time I get "Job execution switched to status FINISHED."
> I thought that this could be a memory issue so I reached 125GB of RAM 
> assigned to my taskmanagers (and jobmanager), but still no luck. 
> The exact command I am running is:
> ./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm 
> PageRank --directed false  --input_filename hdfs://sith0:9000/user/xx.txt 
> --input CSV --type integer --input_field_delimiter $' ' --output print



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5279: [hotfix] [build] Print cache info

2018-01-10 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5279

[hotfix] [build] Print cache info

Print the size of the Maven cache copied for each TravisCI job.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 20180110a_print_cache_info

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5279


commit e16005a55c4b4b59e8ba6859fc273af1e48ba8b7
Author: Greg Hogan 
Date:   2018-01-10T19:07:57Z

[hotfix] [build] Print cache info

Print the size of the Maven cache copied for each TravisCI job.




---


[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.

2018-01-10 Thread Marek Barak (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Barak updated FLINK-8405:
---
Description: 
Hi guys,

I am trying to join 2 streams. Where the second stream is an codelist used by 
the first stream for enrichment. I followed the guide described here:

https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html

With the distinction that instead of having an local HashMap, i used MapState. 
This part is actually important since i want my state properly checkpointed in 
cases of a failure. I managed to reproduce the issue with the following code: 

{code}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.junit.{Test, Assert }

class SimpleTest extends StreamingMultipleProgramsTestBase {


  val env = StreamExecutionEnvironment.getExecutionEnvironment


  case object StateMap extends RichCoFlatMapFunction[String, (String, Int), 
Int] {

var codeList: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
  codeList = getRuntimeContext.getMapState(
new MapStateDescriptor[String,Int]("test", classOf[String], 
classOf[Int])

  )
}

override def flatMap1(value: String, out: Collector[Int]): Unit = {
  val res = if(codeList.contains(value)) codeList.get(value) else 0

  out.collect(res)
}

override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
  codeList.put(value._1, value._2)
  out.close()
}
  }

  @Test
  def job() = {

val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" 
-> 3))

inputStream
  .connect(dictStream.broadcast)
  .flatMap(StateMap)

env.execute()
Assert.assertEquals(1, 1)
  }
}
{code}

I always get the following issue:

{code}
rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
at 
com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
{code}

I guess the main problem is:

{code}
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
{code}

I also tried: 

{code}
inputStream
  .keyBy(a => a

[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.

2018-01-10 Thread Marek Barak (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Barak updated FLINK-8405:
---
Description: 
Hi guys,

I am trying to join 2 streams. Where the second stream is an codelist used by 
the first stream for enrichment. I followed the guide described here:

https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html

With the distinction that instead of having an local HashMap, i used MapState. 
This part is actually important since i want my state properly checkpointed in 
cases of a failure. I managed to reproduce the issue with the following code: 

{code}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.junit.{Test, Assert }

class SimpleTest extends StreamingMultipleProgramsTestBase {


  val env = StreamExecutionEnvironment.getExecutionEnvironment


  case object StateMap extends RichCoFlatMapFunction[String, (String, Int), 
Int] {

var codeList: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
  codeList = getRuntimeContext.getMapState(
new MapStateDescriptor[String,Int]("test", classOf[String], 
classOf[Int])

  )
}

override def flatMap1(value: String, out: Collector[Int]): Unit = {
  val res = if(codeList.contains(value)) codeList.get(value) else 0

  out.collect(res)
}

override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
  codeList.put(value._1, value._2)
  out.close()
}
  }

  @Test
  def job() = {

val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" 
-> 3))

inputStream
  .connect(dictStream.broadcast)
  .flatMap(StateMap)

env.execute()
Assert.assertEquals(1, 1)
  }
}
{code}

I always get the following issue:

{code}
rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
at 
com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
{code}

I guess the main problem is:

{code}
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
{code}

I also tried: 

{code}
inputStream
  .keyBy(a => a

[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.

2018-01-10 Thread Marek Barak (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Barak updated FLINK-8405:
---
Description: 
Hi guys,

I am trying to join 2 streams. Where the second stream is an codelist used by 
the first stream for enrichment. I followed the guide described here:

https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html

With the distinction that instead of having an local HashMap, i used MapState. 
This part is actually important since i want my state properly checkpointed in 
cases of a failure. I managed to reproduce the issue with the following code: 

{code}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.junit.{Test, Assert }

class SimpleTest extends StreamingMultipleProgramsTestBase {


  val env = StreamExecutionEnvironment.getExecutionEnvironment


  case object StateMap extends RichCoFlatMapFunction[String, (String, Int), 
Int] {

var codeList: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
  codeList = getRuntimeContext.getMapState(
new MapStateDescriptor[String,Int]("test", classOf[String], 
classOf[Int])

  )
}

override def flatMap1(value: String, out: Collector[Int]): Unit = {
  val res = if(codeList.contains(value)) codeList.get(value) else 0

  out.collect(res)
}

override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
  codeList.put(value._1, value._2)
  out.close()
}
  }

  @Test
  def job() = {

val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" 
-> 3))

inputStream
  .connect(dictStream.broadcast)
  .flatMap(StateMap)

env.execute()
Assert.assertEquals(1, 1)
  }
}
{code}

I always get the following issue:

{code}
rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
at 
com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
{code}

I guess the main problem is:

{code}
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.
{code}

I also tried: 

{code}
inputStream
  .keyBy(a => a

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320816#comment-16320816
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5248
  
Thank you for the thorough review. Here is a summary of follow-ups from my 
side:
  - Investigate more if we find a better way for configuration than the 
`ConfigurableStateBackend`.
  - Adjust some method names.
  - Possible look at a different name for `CheckpointStorage` to avoid 
confusion with `CompletedCheckpointStore`. We might actually decide to rename 
`CompletedCheckpointStore` to `CompletedCheckpointsRegistry`, because that is 
really what it is. ZK registers the completed checkpoints, it does not store 
them (after this PR that is more true even than before).

One of the bigger comments (to replace String by a stronger typed Pointer 
class) is something I think will not easily work, because Strings is really 
what we get from the command line, REST interface, ZK. Any extra wrapping would 
not add any more safety at this point. When we see that we actually can add 
more safety, we should refactor this.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5248: [FLINK-5823] [checkpoints] State backends now also handle...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5248
  
Thank you for the thorough review. Here is a summary of follow-ups from my 
side:
  - Investigate more if we find a better way for configuration than the 
`ConfigurableStateBackend`.
  - Adjust some method names.
  - Possible look at a different name for `CheckpointStorage` to avoid 
confusion with `CompletedCheckpointStore`. We might actually decide to rename 
`CompletedCheckpointStore` to `CompletedCheckpointsRegistry`, because that is 
really what it is. ZK registers the completed checkpoints, it does not store 
them (after this PR that is more true even than before).

One of the bigger comments (to replace String by a stronger typed Pointer 
class) is something I think will not easily work, because Strings is really 
what we get from the command line, REST interface, ZK. Any extra wrapping would 
not add any more safety at this point. When we see that we actually can add 
more safety, we should refactor this.


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320811#comment-16320811
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160763163
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

I think so. Just following the convention of other components (e.g. [kafka 
consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101))


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2018-01-10 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160763163
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

I think so. Just following the convention of other components (e.g. [kafka 
consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101))


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320799#comment-16320799
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the State

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   pub

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320792#comment-16320792
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761001
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+   private final FileSystem fileSystem;
+
+   private final Path checkpointsDirectory;
+
+   private final Path sharedStateDirectory;
+
+   private final Path taskOwnedStateDirectory;
--- End diff --

We may be able to remove this in the future, but because we need to pull 
task owned state out of the "exclusive state" directory soon, and the rework to 
make this use shared state handles is probably more delicate, I would actually 
have this for now. Should be trivial rework to remove it once we don't need it 
any more.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320793#comment-16320793
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
@@ -18,92 +18,272 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
+ * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
+ * file system dependency in simple setups.
+ *
+ * This state backend should be used only for experimentation, quick 
local setups,
+ * or for streaming applications that have very small state: Because it 
requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
+ * large state sizes. 
+ *
+ * State Size Considerations
+ *
+ * State checkpointing with this state backend is subject to the following 
conditions:
+ * 
+ * Each individual state must not exceed the configured maximum 
state size
+ * (see {@link #getMaxStateSize()}.
+ *
+ * All state from one task (i.e., the sum of all operator states 
and keyed states from all
+ * chained operators of the task) must not exceed what the RPC 
system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that 
is typically not advised.
+ *
+ * The sum of all states in the application times all retained 
checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.
+ * 
+ *
+ * Persistence Guarantees
+ *
+ * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
+ * (when high-availability is configured).
+ *
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savep

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761001
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+   private final FileSystem fileSystem;
+
+   private final Path checkpointsDirectory;
+
+   private final Path sharedStateDirectory;
+
+   private final Path taskOwnedStateDirectory;
--- End diff --

We may be able to remove this in the future, but because we need to pull 
task owned state out of the "exclusive state" directory soon, and the rework to 
make this use shared state handles is probably more delicate, I would actually 
have this for now. Should be trivial rework to remove it once we don't need it 
any more.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160761075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
@@ -18,92 +18,272 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
+ * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
+ * file system dependency in simple setups.
+ *
+ * This state backend should be used only for experimentation, quick 
local setups,
+ * or for streaming applications that have very small state: Because it 
requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
+ * large state sizes. 
+ *
+ * State Size Considerations
+ *
+ * State checkpointing with this state backend is subject to the following 
conditions:
+ * 
+ * Each individual state must not exceed the configured maximum 
state size
+ * (see {@link #getMaxStateSize()}.
+ *
+ * All state from one task (i.e., the sum of all operator states 
and keyed states from all
+ * chained operators of the task) must not exceed what the RPC 
system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that 
is typically not advised.
+ *
+ * The sum of all states in the application times all retained 
checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.
+ * 
+ *
+ * Persistence Guarantees
+ *
+ * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
+ * (when high-availability is configured).
+ *
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class MemoryStateBackend extends AbstractStateBackend {
   

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160760708
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * This class takes the base checkpoint- and savepoint directory paths, 
but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints and externalized 
checkpoints.
+ *
+ * Checkpoint Layout
+ *
+ * The state backend is configured with a base directory and persists 
the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * Savepoint Layout
+ *
+ * A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it 
stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * Metadata and Success Files
+ *
+ * A completed checkpoint writes its metadata into a file '{@value 
AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
+ * After that is complete (i.e., the file complete), it writes an 
additional file
+ * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
+ *
+ * Ideally that would not be necessary, and one would write the 
metadata file to a temporary file and
+ * then issue a atomic (or at least constant time) rename. But some of the 
file systems (like S3) do
+ * not support that: A rename is a copy process which, when failing, 
leaves corrupt half written
+ * files/objects. The success file is hence needed as a signal that the
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is 
complete.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends 
AbstractStateBackend {
+
+   private static final long serialVersionUID = 1L;
+
+   // 

+   //  State Backend Properties
+   // 

+
+   /** The path where checkpoints will be stored, or null, if none has 
been configured. */
+   @Nullable
+   private final Path baseCheckpoin

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320789#comment-16320789
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160760708
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * This class takes the base checkpoint- and savepoint directory paths, 
but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints and externalized 
checkpoints.
+ *
+ * Checkpoint Layout
+ *
+ * The state backend is configured with a base directory and persists 
the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * Savepoint Layout
+ *
+ * A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it 
stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * Metadata and Success Files
+ *
+ * A completed checkpoint writes its metadata into a file '{@value 
AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
+ * After that is complete (i.e., the file complete), it writes an 
additional file
+ * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
+ *
+ * Ideally that would not be necessary, and one would write the 
metadata file to a temporary file and
+ * then issue a atomic (or at least constant time) rename. But some of the 
file systems (like S3) do
+ * not support that: A rename is a copy process which, when failing, 
leaves corrupt half written
+ * files/objects. The success file is hence needed as a signal that the
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is 
complete.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends 
AbstractStateBackend {
+
+   private static final long serialVersionUID = 1L;
+
+   // 

+   //  State Backend P

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320776#comment-16320776
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160759604
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the State

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160759604
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the StateBackendFactory when creating / 
configuring the state
+* backend in the factory
+* @throws IOException
+* May be thrown by the StateBackendFactory when 
instantiating the state backend
+*/
+   pub

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320771#comment-16320771
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
+   // 

+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

See above, I don't think we get around String.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320769#comment-16320769
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758717
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
--- End diff --

Agreed. In the end we have:
  - `CheckpointStorage`: Persisting bytes and Metadata. The Stream 
factories should go there in the next part of the rework.
  - `KeyedStateBackend`: Holding / snapshotting keyed state
  - `OperatorStateBackend`: Holding / snapshotting operator state

These should be three completely independent interfaces, and this PR moves 
towards that by introducing the `CheckpointStorage`, even though it does not 
yet move the Stream Factories there.

Once the rework is done, the `StateBackend` just bundles a combination is 
the three above components (should have only three methods, maybe convenience 
overload).


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
+   // 

+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

See above, I don't think we get around String.


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160758717
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
--- End diff --

Agreed. In the end we have:
  - `CheckpointStorage`: Persisting bytes and Metadata. The Stream 
factories should go there in the next part of the rework.
  - `KeyedStateBackend`: Holding / snapshotting keyed state
  - `OperatorStateBackend`: Holding / snapshotting operator state

These should be three completely independent interfaces, and this PR moves 
towards that by introducing the `CheckpointStorage`, even though it does not 
yet move the Stream Factories there.

Once the rework is done, the `StateBackend` just bundles a combination is 
the three above components (should have only three methods, maybe convenience 
overload).


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320763#comment-16320763
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160757943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is 
typically
+ * created and initialized via {@link 
CheckpointStorage#initializeCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+   /**
+* Creates the output stream to persist the checkpoint metadata to.
+*
+* @return The output stream to persist the checkpoint metadata to.
+* @throws IOException Thrown, if the stream cannot be opened due to an 
I/O error.
+*/
+   CheckpointStateOutputStream createMetadataOutputStream() throws 
IOException;
+
+   /**
+* Finalizes the checkpoint, marking the location as a finished 
checkpoint.
+* This method returns the external checkpoint pointer that can be used 
to resolve
+* the checkpoint upon recovery.
+*
+* @return The external pointer to the checkpoint at this location.
+* @throws IOException Thrown, if finalizing / marking as finished 
fails due to an I/O error.
+*/
+   String markCheckpointAsFinished() throws IOException;
--- End diff --

I have gone back and forth on this. I felt like keeping it, because we do 
need to get to some form of external pointer (a String) in the end (for ZK and 
for external resume). This couples the finalization and obtaining that pointer, 
which makes sense to me (the pointer may not be producible before 
finalization). Pushing that into the Metadata's OutputStream needs another 
interface that creates a "handle and pointer" on closing. I had that in a 
previous version, it felt much more clumsy. This variant seems nicer to me.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160757943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is 
typically
+ * created and initialized via {@link 
CheckpointStorage#initializeCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+   /**
+* Creates the output stream to persist the checkpoint metadata to.
+*
+* @return The output stream to persist the checkpoint metadata to.
+* @throws IOException Thrown, if the stream cannot be opened due to an 
I/O error.
+*/
+   CheckpointStateOutputStream createMetadataOutputStream() throws 
IOException;
+
+   /**
+* Finalizes the checkpoint, marking the location as a finished 
checkpoint.
+* This method returns the external checkpoint pointer that can be used 
to resolve
+* the checkpoint upon recovery.
+*
+* @return The external pointer to the checkpoint at this location.
+* @throws IOException Thrown, if finalizing / marking as finished 
fails due to an I/O error.
+*/
+   String markCheckpointAsFinished() throws IOException;
--- End diff --

I have gone back and forth on this. I felt like keeping it, because we do 
need to get to some form of external pointer (a String) in the end (for ZK and 
for external resume). This couples the finalization and obtaining that pointer, 
which makes sense to me (the pointer may not be producible before 
finalization). Pushing that into the Metadata's OutputStream needs another 
interface that creates a "handle and pointer" on closing. I had that in a 
previous version, it felt much more clumsy. This variant seems nicer to me.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320756#comment-16320756
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756860
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

I was wondering about that as well. My thinking was that in the end, what 
we get is just Strings (from the command line, from ZooKeeper, from the REST 
interface) and there is no point before that method that could parse/convert 
the string into something state backend specific. So we have to deal with 
Strings anyways. Wrapping them in another class felt not too useful to me here.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320757#comment-16320757
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
--- End diff --

How about `initializeLocationForCheckpoint`?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
--- End diff --

How about `initializeLocationForCheckpoint`?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756860
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

I was wondering about that as well. My thinking was that in the end, what 
we get is just Strings (from the command line, from ZooKeeper, from the REST 
interface) and there is no point before that method that could parse/convert 
the string into something state backend specific. So we have to deal with 
Strings anyways. Wrapping them in another class felt not too useful to me here.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320752#comment-16320752
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756347
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
--- End diff --

Yes, this is part of future work. The high-level description says that:

> To make sure there is no extra persisting of the checkpoint metadata by 
the HA store (it simply references the regular persisted checkpoint metadata) 
we need some changes to the ZooKeeperCompletedCheckpointStore.

The method helps as a sanity check, to allow the checkpoint coordinator to 
validate that if the `CompletedCheckpointStore` requires durable persistence, 
then the `CheckpointStore` must provide that. It is used to catch a weird 
corner case where the ZkCompletedCheckpointStore is used, but the StateStore 
(from the MemoryStateBackend) does not support actually persisting any data 
durably. We could remove that, because currently this can never happen (The 
MemoryStateBackend automatically fills in durable persistence in HA setups)


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160756347
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
--- End diff --

Yes, this is part of future work. The high-level description says that:

> To make sure there is no extra persisting of the checkpoint metadata by 
the HA store (it simply references the regular persisted checkpoint metadata) 
we need some changes to the ZooKeeperCompletedCheckpointStore.

The method helps as a sanity check, to allow the checkpoint coordinator to 
validate that if the `CompletedCheckpointStore` requires durable persistence, 
then the `CheckpointStore` must provide that. It is used to catch a weird 
corner case where the ZkCompletedCheckpointStore is used, but the StateStore 
(from the MemoryStateBackend) does not support actually persisting any data 
durably. We could remove that, because currently this can never happen (The 
MemoryStateBackend automatically fills in durable persistence in HA setups)


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320744#comment-16320744
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160755282
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

The storage will create the stream factories later, that is the plan. 
Simply did not want to push that in addition into the pull request. This of 
this pull request as the "JobManager" side of things, the "TaskManager" side of 
thing is to come next.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160755282
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

The storage will create the stream factories later, that is the plan. 
Simply did not want to push that in addition into the pull request. This of 
this pull request as the "JobManager" side of things, the "TaskManager" side of 
thing is to come next.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320743#comment-16320743
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160754932
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 ---
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint 
and savepoint metadata.
+ *
+ * Stored checkpoint metadata files have the following format:
+ * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata 
(variable)]
+ *
+ * The actual savepoint serialization is version-specific via the 
{@link SavepointSerializer}.
+ */
+public class Checkpoints {
--- End diff --

This class mimics somewhat the way this was done previously, scattered over 
the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` 
because the code was relevant not only to savepoints, but to checkpoints in 
general. It was mainly a bit of name-fixing work as part of the adjustment .

Making this pluggable is another orthogonal effort in my opinion, and also 
a probably more involved one.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160754932
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 ---
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint 
and savepoint metadata.
+ *
+ * Stored checkpoint metadata files have the following format:
+ * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata 
(variable)]
+ *
+ * The actual savepoint serialization is version-specific via the 
{@link SavepointSerializer}.
+ */
+public class Checkpoints {
--- End diff --

This class mimics somewhat the way this was done previously, scattered over 
the `SavepointLoader` and `SavepointStore` class. I changed it to `Checkpoints` 
because the code was relevant not only to savepoints, but to checkpoints in 
general. It was mainly a bit of name-fixing work as part of the adjustment .

Making this pluggable is another orthogonal effort in my opinion, and also 
a probably more involved one.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320723#comment-16320723
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1185,6 +1153,10 @@ public int 
getNumberOfRetainedSuccessfulCheckpoints() {
}
}
 
+   public CheckpointStorage getCheckpointStorage() {
--- End diff --

What names would you suggest to use?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320720#comment-16320720
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -251,77 +225,66 @@ public String toString() {
false,
false);
 
-   private static final CheckpointProperties STANDARD_CHECKPOINT = new 
CheckpointProperties(
-   false,
+   private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = 
new CheckpointProperties(
false,
false,
true,
-   true,
-   true,
-   true,
-   true);
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   true,  // Delete on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   false, // Retain on cancellation
-   false,
-   false); // Retain on suspension
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   false, // Retain on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   true, // Delete on cancellation
-   false,
-   true); // Delete on suspension
+   true,   // Delete on success
+   false,  // Retain on cancellation
+   false,  // Retain on failure
+   false); // Retain on suspension
+
 
/**
 * Creates the checkpoint properties for a (manually triggered) 
savepoint.
 *
-* Savepoints are forced and persisted externally. They have to be
+* Savepoints are not queued due to time trigger limits. They have 
to be
 * garbage collected manually.
 *
 * @return Checkpoint properties for a (manually triggered) savepoint.
 */
-   public static CheckpointProperties forStandardSavepoint() {
-   return STANDARD_SAVEPOINT;
-   }
-
-   /**
-* Creates the checkpoint properties for a regular checkpoint.
-*
-* Regular checkpoints are not forced and not persisted externally. 
They
-* are garbage collected automatically.
-*
-* @return Checkpoint properties for a regular checkpoint.
-*/
-   public static CheckpointProperties forStandardCheckpoint() {
-   return STANDARD_CHECKPOINT;
+   public static CheckpointProperties forSavepoint() {
+   return SAVEPOINT;
}
 
/**
-* Creates the checkpoint properties for an external checkpoint.
+* Creates the checkpoint properties for a checkpoint.
 *
-* External checkpoints are not forced, but persisted externally. 
They
-* are garbage collected automatically, except when the owning job
+* Checkpoints may be queued in case too many other checkpoints are 
currently happening.
+* They are garbage collected automatically, except when the owning job
 * terminates in state {@link JobStatus#FAILED}. The user is required to
 * configure the clean up behaviour on job cancellation.
 *
-* @param deleteOnCancellation Flag indicating whether to discard on 
cancellation.
-*
 * @return Checkpoint properties for an external checkpoint.
 */
-   public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
-   if (deleteOnCancellation) {
-   return EXTERNALIZED_CHECKPOINT_DELETED;
-   } else {
-   return EXTERNALIZED_CHECKPOINT_RETAINED;
+   pu

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -251,77 +225,66 @@ public String toString() {
false,
false);
 
-   private static final CheckpointProperties STANDARD_CHECKPOINT = new 
CheckpointProperties(
-   false,
+   private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = 
new CheckpointProperties(
false,
false,
true,
-   true,
-   true,
-   true,
-   true);
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   true,  // Delete on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   false, // Retain on cancellation
-   false,
-   false); // Retain on suspension
+   true,  // Delete on success
+   true,  // Delete on cancellation
+   false, // Retain on failure
+   true); // Delete on suspension
 
-   private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+   private static final CheckpointProperties 
CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false,
-   true,
false,
true,
-   true,
-   true, // Delete on cancellation
-   false,
-   true); // Delete on suspension
+   true,   // Delete on success
+   false,  // Retain on cancellation
+   false,  // Retain on failure
+   false); // Retain on suspension
+
 
/**
 * Creates the checkpoint properties for a (manually triggered) 
savepoint.
 *
-* Savepoints are forced and persisted externally. They have to be
+* Savepoints are not queued due to time trigger limits. They have 
to be
 * garbage collected manually.
 *
 * @return Checkpoint properties for a (manually triggered) savepoint.
 */
-   public static CheckpointProperties forStandardSavepoint() {
-   return STANDARD_SAVEPOINT;
-   }
-
-   /**
-* Creates the checkpoint properties for a regular checkpoint.
-*
-* Regular checkpoints are not forced and not persisted externally. 
They
-* are garbage collected automatically.
-*
-* @return Checkpoint properties for a regular checkpoint.
-*/
-   public static CheckpointProperties forStandardCheckpoint() {
-   return STANDARD_CHECKPOINT;
+   public static CheckpointProperties forSavepoint() {
+   return SAVEPOINT;
}
 
/**
-* Creates the checkpoint properties for an external checkpoint.
+* Creates the checkpoint properties for a checkpoint.
 *
-* External checkpoints are not forced, but persisted externally. 
They
-* are garbage collected automatically, except when the owning job
+* Checkpoints may be queued in case too many other checkpoints are 
currently happening.
+* They are garbage collected automatically, except when the owning job
 * terminates in state {@link JobStatus#FAILED}. The user is required to
 * configure the clean up behaviour on job cancellation.
 *
-* @param deleteOnCancellation Flag indicating whether to discard on 
cancellation.
-*
 * @return Checkpoint properties for an external checkpoint.
 */
-   public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
-   if (deleteOnCancellation) {
-   return EXTERNALIZED_CHECKPOINT_DELETED;
-   } else {
-   return EXTERNALIZED_CHECKPOINT_RETAINED;
+   public static CheckpointProperties 
forCheckpoint(CheckpointRetentionPolicy policy) {
--- End diff --

There is actually a `forSavepoint(...)` method, that's why I think keeping 
the name makes sense. We could change it to `forCheckpointWithPolic

[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160752558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1185,6 +1153,10 @@ public int 
getNumberOfRetainedSuccessfulCheckpoints() {
}
}
 
+   public CheckpointStorage getCheckpointStorage() {
--- End diff --

What names would you suggest to use?


---


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160751897
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Makes sense, let's scope them more generally.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320717#comment-16320717
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160751897
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Makes sense, let's scope them more generally.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6974) Add BIN supported in TableAPI

2018-01-10 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6974.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Was implemented it as part of FLINK-6893.

> Add BIN supported in TableAPI
> -
>
> Key: FLINK-6974
> URL: https://issues.apache.org/jira/browse/FLINK-6974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>  Labels: starter
> Fix For: 1.5.0
>
>
> See FLINK-6893 for detail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6893) Add BIN supported in SQL & Table API

2018-01-10 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-6893:

Summary: Add BIN supported in SQL & Table API  (was: Add BIN supported in 
SQL)

> Add BIN supported in SQL & Table API
> 
>
> Key: FLINK-6893
> URL: https://issues.apache.org/jira/browse/FLINK-6893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.5.0
>
>
> BIN(N) Returns a string representation of the binary value of N, where N is a 
> longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if 
> N is NULL.
> * Syntax:
> BIN(num)
> * Arguments
> **num: a long/bigint value
> * Return Types
>   String
> * Example:
>   BIN(12) -> '1100'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6893) Add BIN supported in SQL & Table API

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320614#comment-16320614
 ] 

ASF GitHub Bot commented on FLINK-6893:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4128


> Add BIN supported in SQL & Table API
> 
>
> Key: FLINK-6893
> URL: https://issues.apache.org/jira/browse/FLINK-6893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.5.0
>
>
> BIN(N) Returns a string representation of the binary value of N, where N is a 
> longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if 
> N is NULL.
> * Syntax:
> BIN(num)
> * Arguments
> **num: a long/bigint value
> * Return Types
>   String
> * Example:
>   BIN(12) -> '1100'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6893) Add BIN supported in SQL & Table API

2018-01-10 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6893.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5: 2914e596bf2af968197b5241aa40840e2e9408ce & 
00ad0eb120026817c79a5fed2c71dd6aa10e9ba6

> Add BIN supported in SQL & Table API
> 
>
> Key: FLINK-6893
> URL: https://issues.apache.org/jira/browse/FLINK-6893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.5.0
>
>
> BIN(N) Returns a string representation of the binary value of N, where N is a 
> longlong (BIGINT) number. This is equivalent to CONV(N,10,2). Returns NULL if 
> N is NULL.
> * Syntax:
> BIN(num)
> * Arguments
> **num: a long/bigint value
> * Return Types
>   String
> * Example:
>   BIN(12) -> '1100'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_bin]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4128: [FLINK-6893][table]Add BIN supported in SQL

2018-01-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4128


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320601#comment-16320601
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160727075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
+   // 

+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

Similar to the `String` vs `StoragePointer`, this could also be a class 
that, essentially , is a `StreamStateHandle` but gives a name to the concept, 
e.g. `CheckpointMetaDataHandle`. But it is not as bad as the other case, 
because this is never passed down very deep (yet).


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320596#comment-16320596
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160721205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
--- End diff --

I would consider renaming this to something like 
`initializeStorageLocationForCheckpoint(...)`


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320588#comment-16320588
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160719709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is 
typically
+ * created and initialized via {@link 
CheckpointStorage#initializeCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+   /**
+* Creates the output stream to persist the checkpoint metadata to.
+*
+* @return The output stream to persist the checkpoint metadata to.
+* @throws IOException Thrown, if the stream cannot be opened due to an 
I/O error.
+*/
+   CheckpointStateOutputStream createMetadataOutputStream() throws 
IOException;
+
+   /**
+* Finalizes the checkpoint, marking the location as a finished 
checkpoint.
+* This method returns the external checkpoint pointer that can be used 
to resolve
+* the checkpoint upon recovery.
+*
+* @return The external pointer to the checkpoint at this location.
+* @throws IOException Thrown, if finalizing / marking as finished 
fails due to an I/O error.
+*/
+   String markCheckpointAsFinished() throws IOException;
--- End diff --

As discussed, you might reconsider if this method is really needed.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320589#comment-16320589
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160690515
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the St

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320593#comment-16320593
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160716512
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1185,6 +1153,10 @@ public int 
getNumberOfRetainedSuccessfulCheckpoints() {
}
}
 
+   public CheckpointStorage getCheckpointStorage() {
--- End diff --

Here I noticed how similar the names `CheckpointStorage` and 
`CompletedCheckpointStore` are. Do you think there are names that make their 
difference more obvious when reading the code? For example, the second could 
also go well under `CompletedCheckpoints`.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320598#comment-16320598
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160730627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+   private final FileSystem fileSystem;
+
+   private final Path checkpointsDirectory;
+
+   private final Path sharedStateDirectory;
+
+   private final Path taskOwnedStateDirectory;
--- End diff --

As discussed, we might not need this if we map task-owned state to shared 
state. Idea: as long as the task still requires the state, it will resend the 
handle to this file (a placeholder to be precise) and keep the shared 
registry's reference count above zero.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320595#comment-16320595
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160638882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
@@ -18,92 +18,272 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
- * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
- * transferred
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
+ * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
+ * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
+ * file system dependency in simple setups.
+ *
+ * This state backend should be used only for experimentation, quick 
local setups,
+ * or for streaming applications that have very small state: Because it 
requires checkpoints to
+ * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
+ * main memory, reducing operational stability.
+ * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
+ * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
+ * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
+ * large state sizes. 
+ *
+ * State Size Considerations
+ *
+ * State checkpointing with this state backend is subject to the following 
conditions:
+ * 
+ * Each individual state must not exceed the configured maximum 
state size
+ * (see {@link #getMaxStateSize()}.
+ *
+ * All state from one task (i.e., the sum of all operator states 
and keyed states from all
+ * chained operators of the task) must not exceed what the RPC 
system supports, which is
+ * be default < 10 MB. That limit can be configured up, but that 
is typically not advised.
+ *
+ * The sum of all states in the application times all retained 
checkpoints must comfortably
+ * fit into the JobManager's JVM heap space.
+ * 
+ *
+ * Persistence Guarantees
+ *
+ * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
+ * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
+ * (when high-availability is configured).
+ *
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
sa

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320599#comment-16320599
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160727468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+   /**
+* Initializes a storage location for new checkpoint with the given ID.
+*
+* The returned storage location can be used to write the checkpoint 
data and metadata
+* to and to obtain the pointers for the location(s) where the actual 
checkpoint data should be
+* stored.
+*
+* @param checkpointId The ID (logical timestamp) of the checkpoint 
that should be persisted.
+* @return A storage location for the data and metadata of the given 
checkpoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
+   CheckpointStorageLocation initializeCheckpoint(long checkpointId) 
throws IOException;
+
+   /**
+* Initializes a storage location for new savepoint with the given ID.
+*
+* If an external location pointer is passed, the savepoint storage 
location
+* will be initialized at the location of that pointer. If the external 
location pointer is null,
+* the default savepoint location will be used. If no default savepoint 
location is configured,
+* this will throw an exception. Whether a default savepoint location 
is configured can be
+* checked via {@link #hasDefaultSavepointLocation()}.
+*
+* @param checkpointId The ID (logical timestamp) of the savepoint's 
checkpoint.
+* @param externalLocationPointer Optionally, a pointer to the location 
where the savepoint should
+*be stored. May be null.
+*
+* @return A storage location for the data and metadata of the 
savepoint.
+*
+* @throws IOException Thrown if the storage location cannot be 
initialized due to an I/O exception.
+*/
   

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320591#comment-16320591
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160698030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 ---
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint 
and savepoint metadata.
+ *
+ * Stored checkpoint metadata files have the following format:
+ * [MagicNumber (int) | Format Version (int) | Checkpoint Metadata 
(variable)]
+ *
+ * The actual savepoint serialization is version-specific via the 
{@link SavepointSerializer}.
+ */
+public class Checkpoints {
--- End diff --

This class contains a lot of static methods that somehow look like they 
want to belong to a checkpoint/savepoint store object. For the sake of keeping 
those part replaceable for unit tests, I wonder if we should eliminate the 
static nature of this code?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320581#comment-16320581
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160646300
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
+ * {@code RocksDBStateBackend}.
+ *
+ * This class takes the base checkpoint- and savepoint directory paths, 
but also accepts null
+ * for both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints and externalized 
checkpoints.
+ *
+ * Checkpoint Layout
+ *
+ * The state backend is configured with a base directory and persists 
the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory with
+ * the job's ID that will contain the actual checkpoints:
+ * ({@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * Savepoint Layout
+ *
+ * A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/}, will create
+ * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it 
stores all savepoint data.
+ * The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * Metadata and Success Files
+ *
+ * A completed checkpoint writes its metadata into a file '{@value 
AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
+ * After that is complete (i.e., the file complete), it writes an 
additional file
+ * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
+ *
+ * Ideally that would not be necessary, and one would write the 
metadata file to a temporary file and
+ * then issue a atomic (or at least constant time) rename. But some of the 
file systems (like S3) do
+ * not support that: A rename is a copy process which, when failing, 
leaves corrupt half written
+ * files/objects. The success file is hence needed as a signal that the
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is 
complete.
+ */
+@PublicEvolving
+public abstract class AbstractFileStateBackend extends 
AbstractStateBackend {
+
+   private static final long serialVersionUID = 1L;
+
+   // 

+   //  State Backen

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320587#comment-16320587
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160690634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the St

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320592#comment-16320592
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160718229
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

I think `resolveStoragePointerForCheckpoint(...)` or similar might be a 
better name.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320583#comment-16320583
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160686745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the St

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320597#comment-16320597
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160725400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
--- End diff --

Another point about naming that I found confusing: Backends have a methods 
to create `CheckpointStorage` and `CheckpointStreamFactory`. From the names, it 
sounds like the storage could also do the job of the stream factories. In fact, 
the storage is dealing more with the metadata aspect on the checkpoint 
coordinator (which we call `Checkpoint`), and the factories are giving as 
streams that are used in checkpointing (so far, so good), but to write the 
snapshots on the task managers.
Maybe the problem is rooted in that checkpoint is some metadata class for 
the JM, as well as some concept - and in the method current names both get 
mixed up.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320600#comment-16320600
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160731923
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
--- End diff --

The purpose of this did not become fully clear for me from the comments and 
the method is also never used. If this is part of some future work, you could 
consider adding the method in a future PR.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320590#comment-16320590
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160687956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class contains utility methods to load state backends from 
configurations.
+ */
+public class StateBackendLoader {
+
+   // 

+   //  Configuration shortcut names
+   // 

+
+   /** The shortcut configuration name for the MemoryState backend that 
checkpoints to the JobManager */
+   public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+   /** The shortcut configuration name for the FileSystem State backend */
+   public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+   /** The shortcut configuration name for the RocksDB State Backend */
+   public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+   // 

+   //  Loading the state backend from a configuration 
+   // 

+
+   /**
+* Loads the state backend from the configuration, from the parameter 
'state.backend', as defined
+* in {@link CheckpointingOptions#STATE_BACKEND}.
+*
+* The state backends can be specified either via their shortcut 
name, or via the class name
+* of a {@link StateBackendFactory}. If a StateBackendFactory class 
name is specified, the factory
+* is instantiated (via its zero-argument constructor) and its
+* {@link StateBackendFactory#createFromConfig(Configuration)} method 
is called.
+*
+* Recognized shortcut names are '{@value 
StateBackendLoader#MEMORY_STATE_BACKEND_NAME}',
+* '{@value StateBackendLoader#FS_STATE_BACKEND_NAME}', and
+* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}'.
+*
+* @param config The configuration to load the state backend from
+* @param classLoader The class loader that should be used to load the 
state backend
+* @param logger Optionally, a logger to log actions to (may be null)
+*
+* @return The instantiated state backend.
+*
+* @throws DynamicCodeLoadingException
+* Thrown if a state backend factory is configured and the 
factory class was not
+* found or the factory could not be instantiated
+* @throws IllegalConfigurationException
+* May be thrown by the St

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320594#comment-16320594
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160722748
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -475,13 +474,12 @@ public void enableCheckpointing(
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
-   externalizeSettings,
+   retentionPolicy,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
--- End diff --

Here is one example where the proximity between the classnames 
`CompletedCheckpointStore` and `CheckpointStorage` becomes obvious.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320584#comment-16320584
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160644877
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -84,13 +84,44 @@
 public interface StateBackend extends java.io.Serializable {
 
// 

-   //  Persistent Bytes Storage
+   //  Checkpoint storage - the durable persistence of checkpoint data
--- End diff --

The interface `StateBackend` is getting more crowded and more functionality 
is added and all methods are visible to everybody from `CheckpointCoordinator` 
down to the `XYZKeyedStateBackend`. You have already divided the interface into 
3 segments and I suggest to actually split it into 3 parent interfaces, which 
are all implemented by `StateBackend`. In many places, we can just pass by the 
interface of the sub-functionality that is relevant, e.g. 
`XYZKeyedStateBackend` probably only needs to see to part about creating 
streams and should never call things that create a new backend. This would be  
a big step for separation of concerns, to move away from "global visibility". 
Our IDE should be able to automatically figure out all the places where the 
sub-interfaces are sufficient, but as it might be a bigger diff, we could do a 
separate PR for this.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...

2018-01-10 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5248#discussion_r160718388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and 
metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link 
CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+   /**
+* Checks whether this backend supports highly available storage of 
data.
+*
+* Some state backends may offer support for that with default 
settings, which makes them
+* suitable for zero-config prototyping, but not for actual production 
setups.
+*/
+   boolean supportsHighlyAvailableStorage();
+
+   /**
+* Checks whether the storage has a default savepoint location 
configured.
+*/
+   boolean hasDefaultSavepointLocation();
+
+   /**
+* Resolves the given pointer to a checkpoint/savepoint into a state 
handle from which the
+* checkpoint metadata can be read. If the state backend cannot 
understand the format of
+* the pointer (for example because it was created by a different state 
backend) this method
+* should throw an {@code IOException}.
+*
+* @param pointer The pointer to resolve.
+* @return The state handler from which one can read the checkpoint 
metadata.
+*
+* @throws IOException Thrown, if the state backend does not understand 
the pointer, or if
+* the pointer could not be resolved due to an I/O 
error.
+*/
+   StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
--- End diff --

Furthermore, I suggest to introduce a separate class like 
`(Checkpoint)StoragePointer` instead of plain `String`. Internally, the class 
can be really just a string, but I find it very helpful for reading to code if 
there is a meaningful classname attached to a concept instead of raw String 
usage. Also helps to avoid autocomplete mistakes for methods that take a 
pointer plus other strings.


---


  1   2   3   4   >