[jira] [Updated] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8318:

Fix Version/s: 1.4.1
   1.5.0

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
> Fix For: 1.5.0, 1.4.1
>
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Classpath:
> ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...
> 
> 2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to 
> FAILED.
> java.lang.NoSuchFieldErr

[jira] [Commented] (FLINK-8363) Build Hadoop 2.9.0 convenience binaries

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313309#comment-16313309
 ] 

Aljoscha Krettek commented on FLINK-8363:
-

Probably everything except the filesystem connector and the YARN tests can ran 
Hadoop-free.

> Build Hadoop 2.9.0 convenience binaries
> ---
>
> Key: FLINK-8363
> URL: https://issues.apache.org/jira/browse/FLINK-8363
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Hadoop 2.9.0 was released on 17 November, 2017. A local {{mvn clean verify 
> -Dhadoop.version=2.9.0}} ran successfully.
> With the new Hadoopless build we may be able to improve the build process by 
> reusing the {{flink-dist}} jar (which differ only in build timestamps) and 
> simply make each Hadoop-specific tarball by copying in the corresponding 
> {{flink-shaded-hadoop2-uber}} jar.
> What portion of the TravisCI jobs can run Hadoopless? We could build and 
> verify these once and then run a Hadoop-versioned job for each Hadoop version.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8346) add S3 signature v4 workaround to docs

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313308#comment-16313308
 ] 

ASF GitHub Bot commented on FLINK-8346:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5231
  
Very nice, thanks!

Merging this...


> add S3 signature v4 workaround to docs
> --
>
> Key: FLINK-8346
> URL: https://issues.apache.org/jira/browse/FLINK-8346
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, FileSystem
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> As per 
> https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E,
>  we should add a hint to enable signature v4 for older Hadoop versions to 
> work with S3 (for the non-shaded S3 file systems and regions only accepting 
> v4, e.g. {{eu-central-1}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5231: [FLINK-8346][docs] add v4 signature workaround for manual...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5231
  
Very nice, thanks!

Merging this...


---


[jira] [Commented] (FLINK-8364) ListState should return empty iterator rather than null when it has no value

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313298#comment-16313298
 ] 

Aljoscha Krettek commented on FLINK-8364:
-

+1 to what [~fhueske] said!

> ListState should return empty iterator rather than null when it has no value
> 
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> per discussion with [~stefanrichte...@gmail.com] in 
> https://github.com/apache/flink/pull/4963, we decide to have 
> {{ListState#get}} return empty iterator if it has no value in it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5205: [FLINK-8037] Fix integer multiplication or shift i...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5205#discussion_r159899525
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractID.java ---
@@ -186,7 +186,7 @@ private static long byteArrayToLong(byte[] ba, int 
offset) {
long l = 0;
 
for (int i = 0; i < SIZE_OF_LONG; ++i) {
-   l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i 
<< 3);
+   l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << 
((long) i << 3);
--- End diff --

I think this might be not quite right - the number of bits to shift stays 
below 64 and should be correct as an int.


---


[jira] [Commented] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313297#comment-16313297
 ] 

ASF GitHub Bot commented on FLINK-8037:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5205#discussion_r159899525
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractID.java ---
@@ -186,7 +186,7 @@ private static long byteArrayToLong(byte[] ba, int 
offset) {
long l = 0;
 
for (int i = 0; i < SIZE_OF_LONG; ++i) {
-   l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i 
<< 3);
+   l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << 
((long) i << 3);
--- End diff --

I think this might be not quite right - the number of bits to shift stays 
below 64 and should be correct as an int.


> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Greg Hogan
>Priority: Minor
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6214) WindowAssigners do not allow negative offsets

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313293#comment-16313293
 ] 

Aljoscha Krettek commented on FLINK-6214:
-

Nope, no reason except no-one has gotten around to it yet.

> WindowAssigners do not allow negative offsets
> -
>
> Key: FLINK-6214
> URL: https://issues.apache.org/jira/browse/FLINK-6214
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>
> Both the website and the JavaDoc promotes 
> ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For 
> example, in China you would have to specify an offset of Time.hours(-8)". But 
> both the sliding and tumbling event time assigners do not allow offset to be 
> negative.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8361) Remove create_release_files.sh

2018-01-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313291#comment-16313291
 ] 

Aljoscha Krettek commented on FLINK-8361:
-

Ah yes, definitely remove. I left it in when I added the new scripts in case 
something went wrong but everything seems to work smoothly.

> Remove create_release_files.sh
> --
>
> Key: FLINK-8361
> URL: https://issues.apache.org/jira/browse/FLINK-8361
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Priority: Trivial
>
> The monolithic {{create_release_files.sh}} does not support building Flink 
> without Hadoop and looks to have been superseded by the scripts in 
> {{tools/releasing}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5204: [hotfix] Fix typo in ReplicatingInputFormat/DualInputOper...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5204
  
The code looks good, thanks! Merging this...

For the next contributions, it would be great if you could give the Pull 
Request a proper description, as the text of the pull request template 
describes.


---


[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313257#comment-16313257
 ] 

ASF GitHub Bot commented on FLINK-8260:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5179
  
These look like good changes. +1 to merge


> Document API of Kafka 0.11 Producer
> ---
>
> Key: FLINK-8260
> URL: https://issues.apache.org/jira/browse/FLINK-8260
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The API of the Flink Kafka Producer changed for Kafka 0.11, for example there 
> is no {{writeToKafkaWithTimestamps}} method anymore.
> This needs to be added to the [Kafka connector 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer],
>  i.e., a new tab with a code snippet needs to be added for Kafka 0.11.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5191: Release 1.4

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5191
  
This looks like an accidentally opened pull request. Can you please close 
it?


---


[GitHub] flink issue #5179: [FLINK-8260/8287] [kafka] Bunch of improvements to Kafka ...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5179
  
These look like good changes. +1 to merge


---


[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313206#comment-16313206
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159878296
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 ---
@@ -532,35 +527,27 @@ class TableEnvironmentTest extends TableTestBase {
 
 // atomic
 util.verifySchema(
-  util.addTable[Int](('myint as 'new).proctime),
-  Seq("new" -> PROCTIME))
+  util.addTable[Long]('new.rowtime),
--- End diff --

No alias used in method that should check "alias with replacing time 
attributes by name"


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313212#comment-16313212
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159878618
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
 ---
@@ -138,4 +145,25 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
 // we mix reference by position and by name
 util.addTable[(Long, Int, String, Int, Long)]('x, '_1)
   }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasWithProctimeAttribute(): Unit = {
+val util = streamTestUtil()
+// alias in proctime not allowed
+util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).proctime, 
'_3)
--- End diff --

also add a test where proctime alias references an existing field and a 
test for `'x.proctime` where `'x` is a valid field (if there isn't such a test 
yet).
  


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313211#comment-16313211
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159887168
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
+throw new TableException(
+  s"The rowtime attribute can only replace a valid field. " +
+s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
+  }
+  else if (idx < fieldTypes.length) {
+checkRowtimeType(fieldTypes(idx))
+  }
 }
-// check type of field that is replaced
-if (mappedIdx < 0) {
-  throw new TableException(
-s"The rowtime attribute can only replace a valid field. " +
-  s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
-}
-else if (mappedIdx < fieldTypes.length &&
-  !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
-TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx {
-  throw new TableException(
-s"The rowtime attribute can only replace a field with a valid 
time type, " +
-  s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
+// check for valid alias if referenced by name
+else if (origName.isDefined) {
--- End diff --

we should also check the field type for ref-by-name if no alias is used and 
the name references a field that exists in the input, e.g., for `Tuple3` `('f1, 
'f0.rowtime, 'f2)`.


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to r

[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313210#comment-16313210
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159885776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
--- End diff --

I think this will never be `true` because the method is called with `idx` 
being an index of `exprs`


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to

[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313213#comment-16313213
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159889511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
--- End diff --

aliases are not permitted for regular fields in ref-by-pos mode. I think 
prohibit them for rowtime attributes as well by checking for 
`origName.isDefined`.


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a d

[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313208#comment-16313208
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159877092
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -855,42 +852,26 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   "An input of GenericTypeInfo cannot be converted to Table. 
" +
 "Please specify the type of the input with a RowTypeInfo.")
 
-  case t: TupleTypeInfo[A] =>
-exprs.zipWithIndex flatMap {
-  case (UnresolvedFieldReference(name: String), idx) =>
-if (isReferenceByPosition) {
-  Some((idx, name))
-} else {
-  referenceByName(name, t)
-}
-  case (Alias(UnresolvedFieldReference(origName), name: String, 
_), _) =>
-val idx = t.getFieldIndex(origName)
-if (idx < 0) {
-  throw new TableException(s"$origName is not a field of type 
$t. " +
-s"Expected: ${t.getFieldNames.mkString(", ")}")
-}
-Some((idx, name))
-  case (_: TimeAttribute, _) =>
-None
-  case _ => throw new TableException(
-"Field reference expression or alias on field expression 
expected.")
-}
+  case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
+t.isInstanceOf[CaseClassTypeInfo[A]] || 
t.isInstanceOf[RowTypeInfo] =>
+
+// determine schema definition mode (by position or by name)
+val isRefByPos = isReferenceByPosition(t, exprs)
 
-  case c: CaseClassTypeInfo[A] =>
 exprs.zipWithIndex flatMap {
   case (UnresolvedFieldReference(name: String), idx) =>
-if (isReferenceByPosition) {
+if (isRefByPos) {
--- End diff --

moving the `isRefByPos` check outside of the `match` might be easier to 
read?


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313209#comment-16313209
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159876208
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -446,52 +443,60 @@ abstract class StreamTableEnvironment(
 * Checks for at most one rowtime and proctime attribute.
 * Returns the time attributes.
 *
-* @param isReferenceByPosition schema mode see 
[[isReferenceByPosition()]]
-*
 * @return rowtime attribute and proctime attribute
 */
   private def validateAndExtractTimeAttributes(
-isReferenceByPosition: Boolean,
 streamType: TypeInformation[_],
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case t: TypeInformation[_] => Array(t)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
 // if the fields are referenced by position,
 // it is possible to replace an existing field or append the time 
attribute at the end
-if (isReferenceByPosition) {
-
-  val mappedIdx = streamType match {
-case pti: PojoTypeInfo[_] =>
-  pti.getFieldIndex(origName.getOrElse(name))
-case _ => idx;
-  }
-
+if (isRefByPos) {
   // check type of field that is replaced
-  if (mappedIdx < 0) {
+  if (idx < 0) {
 throw new TableException(
   s"The rowtime attribute can only replace a valid field. " +
 s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
   }
-  else if (mappedIdx < fieldTypes.length &&
-!(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
-  TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx {
-throw new TableException(
-  s"The rowtime attribute can only replace a field with a 
valid time type, " +
-s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
+  else if (idx < fieldTypes.length) {
+checkRowtimeType(fieldTypes(idx))
+  }
+}
+// check for valid alias if referenced by name
+else if (origName.isDefined) {
+  // check for valid alias
+  streamType match {
+case ct: CompositeType[_] if ct.hasField(origName.get) =>
+  val t = ct.getTypeAt(ct.getFieldIndex(origName.get))
+  checkRowtimeType(t)
+case _ =>
+  throw new TableException("An alias must always reference an 
existing field.")
--- End diff --

Add `origName` (and the existing fields?) to the error message.


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined 

[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313207#comment-16313207
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159880719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -492,11 +509,16 @@ abstract class StreamTableEnvironment(
   throw new TableException(
 "The proctime attribute can only be defined once in a table 
schema.")
   } else {
-// check that proctime is only appended
-if (idx < fieldTypes.length) {
-  throw new TableException(
-"The proctime attribute can only be appended to the table 
schema and not replace " +
-  "an existing field. Please move it to the end of the 
schema.")
+// if the fields are referenced by position,
+// it is only possible to append the time attribute at the end
+if (isRefByPos) {
+
+  // check that proctime is only appended
+  if (idx < fieldTypes.length) {
+throw new TableException(
+  "The proctime attribute can only be appended to the table 
schema and not replace " +
+"an existing field. Please move it to the end of the 
schema.")
+  }
 }
 proctime = Some(idx, name)
--- End diff --

should we also check that the name of the proctime attribute is not in the 
fields of the input type?


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159887168
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
+throw new TableException(
+  s"The rowtime attribute can only replace a valid field. " +
+s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
+  }
+  else if (idx < fieldTypes.length) {
+checkRowtimeType(fieldTypes(idx))
+  }
 }
-// check type of field that is replaced
-if (mappedIdx < 0) {
-  throw new TableException(
-s"The rowtime attribute can only replace a valid field. " +
-  s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
-}
-else if (mappedIdx < fieldTypes.length &&
-  !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
-TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx {
-  throw new TableException(
-s"The rowtime attribute can only replace a field with a valid 
time type, " +
-  s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
+// check for valid alias if referenced by name
+else if (origName.isDefined) {
--- End diff --

we should also check the field type for ref-by-name if no alias is used and 
the name references a field that exists in the input, e.g., for `Tuple3` `('f1, 
'f0.rowtime, 'f2)`.


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159878618
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
 ---
@@ -138,4 +145,25 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
 // we mix reference by position and by name
 util.addTable[(Long, Int, String, Int, Long)]('x, '_1)
   }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasWithProctimeAttribute(): Unit = {
+val util = streamTestUtil()
+// alias in proctime not allowed
+util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).proctime, 
'_3)
--- End diff --

also add a test where proctime alias references an existing field and a 
test for `'x.proctime` where `'x` is a valid field (if there isn't such a test 
yet).
  


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159885776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
--- End diff --

I think this will never be `true` because the method is called with `idx` 
being an index of `exprs`


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159876208
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -446,52 +443,60 @@ abstract class StreamTableEnvironment(
 * Checks for at most one rowtime and proctime attribute.
 * Returns the time attributes.
 *
-* @param isReferenceByPosition schema mode see 
[[isReferenceByPosition()]]
-*
 * @return rowtime attribute and proctime attribute
 */
   private def validateAndExtractTimeAttributes(
-isReferenceByPosition: Boolean,
 streamType: TypeInformation[_],
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case t: TypeInformation[_] => Array(t)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
 // if the fields are referenced by position,
 // it is possible to replace an existing field or append the time 
attribute at the end
-if (isReferenceByPosition) {
-
-  val mappedIdx = streamType match {
-case pti: PojoTypeInfo[_] =>
-  pti.getFieldIndex(origName.getOrElse(name))
-case _ => idx;
-  }
-
+if (isRefByPos) {
   // check type of field that is replaced
-  if (mappedIdx < 0) {
+  if (idx < 0) {
 throw new TableException(
   s"The rowtime attribute can only replace a valid field. " +
 s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
   }
-  else if (mappedIdx < fieldTypes.length &&
-!(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
-  TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx {
-throw new TableException(
-  s"The rowtime attribute can only replace a field with a 
valid time type, " +
-s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
+  else if (idx < fieldTypes.length) {
+checkRowtimeType(fieldTypes(idx))
+  }
+}
+// check for valid alias if referenced by name
+else if (origName.isDefined) {
+  // check for valid alias
+  streamType match {
+case ct: CompositeType[_] if ct.hasField(origName.get) =>
+  val t = ct.getTypeAt(ct.getFieldIndex(origName.get))
+  checkRowtimeType(t)
+case _ =>
+  throw new TableException("An alias must always reference an 
existing field.")
--- End diff --

Add `origName` (and the existing fields?) to the error message.


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159889511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
 exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-val fieldTypes: Array[TypeInformation[_]] = streamType match {
-  case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
-  case a: AtomicType[_] => Array(a)
+val (isRefByPos, fieldTypes) = streamType match {
+  case c: CompositeType[_] =>
+// determine schema definition mode (by position or by name)
+(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
+  case t: TypeInformation[_] =>
+(false, Array(t))
 }
 
 var fieldNames: List[String] = Nil
 var rowtime: Option[(Int, String)] = None
 var proctime: Option[(Int, String)] = None
 
+def checkRowtimeType(t: TypeInformation[_]): Unit = {
+  if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+throw new TableException(
+  s"The rowtime attribute can only replace a field with a valid 
time type, " +
+  s"such as Timestamp or Long. But was: $t")
+  }
+}
+
 def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
   if (rowtime.isDefined) {
 throw new TableException(
   "The rowtime attribute can only be defined once in a table 
schema.")
   } else {
-val mappedIdx = streamType match {
-  case pti: PojoTypeInfo[_] =>
-pti.getFieldIndex(origName.getOrElse(name))
-  case _ => idx;
+// if the fields are referenced by position,
+// it is possible to replace an existing field or append the time 
attribute at the end
+if (isRefByPos) {
+  // check type of field that is replaced
+  if (idx < 0) {
--- End diff --

aliases are not permitted for regular fields in ref-by-pos mode. I think 
prohibit them for rowtime attributes as well by checking for 
`origName.isDefined`.


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159878296
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 ---
@@ -532,35 +527,27 @@ class TableEnvironmentTest extends TableTestBase {
 
 // atomic
 util.verifySchema(
-  util.addTable[Int](('myint as 'new).proctime),
-  Seq("new" -> PROCTIME))
+  util.addTable[Long]('new.rowtime),
--- End diff --

No alias used in method that should check "alias with replacing time 
attributes by name"


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159880719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -492,11 +509,16 @@ abstract class StreamTableEnvironment(
   throw new TableException(
 "The proctime attribute can only be defined once in a table 
schema.")
   } else {
-// check that proctime is only appended
-if (idx < fieldTypes.length) {
-  throw new TableException(
-"The proctime attribute can only be appended to the table 
schema and not replace " +
-  "an existing field. Please move it to the end of the 
schema.")
+// if the fields are referenced by position,
+// it is only possible to append the time attribute at the end
+if (isRefByPos) {
+
+  // check that proctime is only appended
+  if (idx < fieldTypes.length) {
+throw new TableException(
+  "The proctime attribute can only be appended to the table 
schema and not replace " +
+"an existing field. Please move it to the end of the 
schema.")
+  }
 }
 proctime = Some(idx, name)
--- End diff --

should we also check that the name of the proctime attribute is not in the 
fields of the input type?


---


[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...

2018-01-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5132#discussion_r159877092
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -855,42 +852,26 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   "An input of GenericTypeInfo cannot be converted to Table. 
" +
 "Please specify the type of the input with a RowTypeInfo.")
 
-  case t: TupleTypeInfo[A] =>
-exprs.zipWithIndex flatMap {
-  case (UnresolvedFieldReference(name: String), idx) =>
-if (isReferenceByPosition) {
-  Some((idx, name))
-} else {
-  referenceByName(name, t)
-}
-  case (Alias(UnresolvedFieldReference(origName), name: String, 
_), _) =>
-val idx = t.getFieldIndex(origName)
-if (idx < 0) {
-  throw new TableException(s"$origName is not a field of type 
$t. " +
-s"Expected: ${t.getFieldNames.mkString(", ")}")
-}
-Some((idx, name))
-  case (_: TimeAttribute, _) =>
-None
-  case _ => throw new TableException(
-"Field reference expression or alias on field expression 
expected.")
-}
+  case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
+t.isInstanceOf[CaseClassTypeInfo[A]] || 
t.isInstanceOf[RowTypeInfo] =>
+
+// determine schema definition mode (by position or by name)
+val isRefByPos = isReferenceByPosition(t, exprs)
 
-  case c: CaseClassTypeInfo[A] =>
 exprs.zipWithIndex flatMap {
   case (UnresolvedFieldReference(name: String), idx) =>
-if (isReferenceByPosition) {
+if (isRefByPos) {
--- End diff --

moving the `isRefByPos` check outside of the `match` might be easier to 
read?


---


[jira] [Commented] (FLINK-8340) Do not pass Configuration and configuration directory to CustomCommandLine methods

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313168#comment-16313168
 ] 

ASF GitHub Bot commented on FLINK-8340:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159874411
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -183,93 +204,43 @@ public FlinkYarnSessionCli(String shortPrefix, String 
longPrefix, boolean accept
allOptions.addOption(applicationId);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(flip6);
-   }
 
-   /**
-* Tries to load a Flink Yarn properties file and returns the Yarn 
application id if successful.
-* @param cmdLine The command-line parameters
-* @param flinkConfiguration The flink configuration
-* @return Yarn application id or null if none could be retrieved
-*/
-   private String loadYarnPropertiesFile(CommandLine cmdLine, 
Configuration flinkConfiguration) {
+   // try loading a potential yarn properties file
+   final File yarnPropertiesLocation = 
getYarnPropertiesLocation(configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
 
-   String jobManagerOption = 
cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
-   if (jobManagerOption != null) {
-   // don't resume from properties file if a JobManager 
has been specified
-   return null;
-   }
+   yarnPropertiesFile = new Properties();
 
-   for (Option option : cmdLine.getOptions()) {
-   if (allOptions.hasOption(option.getOpt())) {
-   if (!option.getOpt().equals(detached.getOpt())) 
{
-   // don't resume from properties file if 
yarn options have been specified
-   return null;
-   }
-   }
-   }
-
-   // load the YARN properties
-   File propertiesFile = 
getYarnPropertiesLocation(flinkConfiguration);
-   if (!propertiesFile.exists()) {
-   return null;
-   }
-
-   logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
+   if (yarnPropertiesLocation.exists()) {
+   LOG.info("Found Yarn properties file under " + 
yarnPropertiesLocation.getAbsolutePath() + '.');
--- End diff --

nit: slf4j placeholders can be used `{}`


> Do not pass Configuration and configuration directory to CustomCommandLine 
> methods
> --
>
> Key: FLINK-8340
> URL: https://issues.apache.org/jira/browse/FLINK-8340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, all methods in {{CustomCommandLine}} need a {{Configuration}} and 
> sometimes the configuration directory. Since these values should not change 
> over the lifetime of the {{CustomCommandLine}} we should pass them as a 
> constructor argument instead of a method argument.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4594


---


[jira] [Commented] (FLINK-7517) let NettyBufferPool extend PooledByteBufAllocator

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313192#comment-16313192
 ] 

ASF GitHub Bot commented on FLINK-7517:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4594


> let NettyBufferPool extend PooledByteBufAllocator
> -
>
> Key: FLINK-7517
> URL: https://issues.apache.org/jira/browse/FLINK-7517
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{NettyBufferPool}} wraps {{PooledByteBufAllocator}} but due to this, any 
> allocated buffer's {{alloc()}} method is returning the wrapped 
> {{PooledByteBufAllocator}} which allowed heap buffers again. By extending the 
> {{PooledByteBufAllocator}}, we prevent this loop hole and also fix the 
> invariant that a copy of a buffer should have the same allocator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313191#comment-16313191
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4581


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7427) integrate PartitionRequestProtocol into NettyProtocol

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313190#comment-16313190
 ] 

ASF GitHub Bot commented on FLINK-7427:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4528


> integrate PartitionRequestProtocol into NettyProtocol
> -
>
> Key: FLINK-7427
> URL: https://issues.apache.org/jira/browse/FLINK-7427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Back when the current network stack was created, an interface 
> {{NettyProtocol}} was defined with the only implementing class being 
> {{PartitionRequestProtocol}}, except for unit tests which were, however, also 
> partly based on {{PartitionRequestProtocol}}.
> To make things a bit simpler, we should integrate 
> {{PartitionRequestProtocol}}—which only defines the channel handlers to use 
> by the client and server—into {{NettyProtocol}} and clean up a bit.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4528: [FLINK-7427][network] integrate PartitionRequestPr...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4528


---


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4581


---


[GitHub] flink pull request #5226: [FLINK-8340] [flip6] Remove passing of Configurati...

2018-01-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159874411
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -183,93 +204,43 @@ public FlinkYarnSessionCli(String shortPrefix, String 
longPrefix, boolean accept
allOptions.addOption(applicationId);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(flip6);
-   }
 
-   /**
-* Tries to load a Flink Yarn properties file and returns the Yarn 
application id if successful.
-* @param cmdLine The command-line parameters
-* @param flinkConfiguration The flink configuration
-* @return Yarn application id or null if none could be retrieved
-*/
-   private String loadYarnPropertiesFile(CommandLine cmdLine, 
Configuration flinkConfiguration) {
+   // try loading a potential yarn properties file
+   final File yarnPropertiesLocation = 
getYarnPropertiesLocation(configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
 
-   String jobManagerOption = 
cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
-   if (jobManagerOption != null) {
-   // don't resume from properties file if a JobManager 
has been specified
-   return null;
-   }
+   yarnPropertiesFile = new Properties();
 
-   for (Option option : cmdLine.getOptions()) {
-   if (allOptions.hasOption(option.getOpt())) {
-   if (!option.getOpt().equals(detached.getOpt())) 
{
-   // don't resume from properties file if 
yarn options have been specified
-   return null;
-   }
-   }
-   }
-
-   // load the YARN properties
-   File propertiesFile = 
getYarnPropertiesLocation(flinkConfiguration);
-   if (!propertiesFile.exists()) {
-   return null;
-   }
-
-   logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
+   if (yarnPropertiesLocation.exists()) {
+   LOG.info("Found Yarn properties file under " + 
yarnPropertiesLocation.getAbsolutePath() + '.');
--- End diff --

nit: slf4j placeholders can be used `{}`


---


[GitHub] flink pull request #5226: [FLINK-8340] [flip6] Remove passing of Configurati...

2018-01-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159883795
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
 ---
@@ -60,220 +59,178 @@ public static void init() {
}
 
@Test
-   public void testNonExistingJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testNonExistingJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+   ProgramOptions options = mock(ProgramOptions.class);
+   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (FileNotFoundException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (FileNotFoundException e) {
+   // that's what we want
}
}
 
@Test
-   public void testFileNotJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testFileNotJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+   ProgramOptions options = mock(ProgramOptions.class);
+   when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (ProgramInvocationException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (ProgramInvocationException e) {
+   // that's what we want
}
}
 
@Test
-   public void testVariantWithExplicitJarAndArgumentsOption() {
-   try {
-   String[] arguments = {
-   "--classpath", "file:///tmp/foo",
-   "--classpath", "file:///tmp/bar",
-   "-j", getTestJarPath(),
-   "-a", "--debug", "true", "arg1", "arg2" 
};
-   URL[] classpath = new URL[] { new 
URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
-   String[] reducedArguments = new String[] {"--debug", 
"true", "arg1", "arg2"};
-
-   RunOptions options = 
CliFrontendParser.parseRunCommand(arguments);
-   assertEquals(getTestJarPath(), 
options.getJarFilePath());
-   assertArrayEquals(classpath, 
options.getClasspaths().toArray());
-   assertArrayEquals(reducedArguments, 
options.getProgramArgs());
-
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),

[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...

2018-01-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159885568
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+
+import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * Base class for {@link CustomCommandLine} implementations which specify 
a JobManager address and
+ * a ZooKeeper namespace.
+ *
+ * @param  type of the ClusterClient which is returned
+ */
+public abstract class AbstractCustomCommandLine 
implements CustomCommandLine {
+
+   protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new 
Option("z", "zookeeperNamespace", true,
+   "Namespace to create the Zookeeper sub-paths for high 
availability mode");
+
+
+   protected static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true,
--- End diff --

Why are the options here constants? In `FlinkYarnSessionCli` the `Option`s 
are instance variables. Also `Option`s are mutable.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313170#comment-16313170
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159884153
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 ---
@@ -1,104 +0,0 @@
-/*
--- End diff --

Yes, they are deleted in later commits anyways.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313183#comment-16313183
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r159885568
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+
+import static 
org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * Base class for {@link CustomCommandLine} implementations which specify 
a JobManager address and
+ * a ZooKeeper namespace.
+ *
+ * @param  type of the ClusterClient which is returned
+ */
+public abstract class AbstractCustomCommandLine 
implements CustomCommandLine {
+
+   protected static final Option ZOOKEEPER_NAMESPACE_OPTION = new 
Option("z", "zookeeperNamespace", true,
+   "Namespace to create the Zookeeper sub-paths for high 
availability mode");
+
+
+   protected static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true,
--- End diff --

Why are the options here constants? In `FlinkYarnSessionCli` the `Option`s 
are instance variables. Also `Option`s are mutable.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5178: [hotfix] Fix typo in TestableKinesisDataFetcher

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5178
  
Merging this...


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159884153
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 ---
@@ -1,104 +0,0 @@
-/*
--- End diff --

Yes, they are deleted in later commits anyways.


---


[jira] [Commented] (FLINK-8340) Do not pass Configuration and configuration directory to CustomCommandLine methods

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313167#comment-16313167
 ] 

ASF GitHub Bot commented on FLINK-8340:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159883795
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
 ---
@@ -60,220 +59,178 @@ public static void init() {
}
 
@Test
-   public void testNonExistingJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testNonExistingJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+   ProgramOptions options = mock(ProgramOptions.class);
+   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (FileNotFoundException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (FileNotFoundException e) {
+   // that's what we want
}
}
 
@Test
-   public void testFileNotJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testFileNotJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+   ProgramOptions options = mock(ProgramOptions.class);
+   when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (ProgramInvocationException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (ProgramInvocationException e) {
+   // that's what we want
}
}
 
@Test
-   public void testVariantWithExplicitJarAndArgumentsOption() {
-   try {
-   String[] arguments = {
-   "--classpath", "file:///tmp/foo",
-   "--classpath", "file:///tmp/bar",
-   "-j", getTestJarPath(),
-   "-a", "--debug", "true", "arg1", "arg2" 
};
-   URL[] classpath = new URL[] { new 
URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
-   String[] reducedArguments = new String[] {"--debug", 
"true", "arg1", "arg2"};
-
-   RunOptions options = 
CliFrontendParser.parseRunCommand(arguments);
-   assertEquals(getTestJarPath(), 
options.getJarFilePath());
-   assertArrayEquals(classpath, 
options.getClasspaths().toArray());
-   assertA

[GitHub] flink pull request #5226: [FLINK-8340] [flip6] Remove passing of Configurati...

2018-01-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159883585
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
 ---
@@ -60,220 +59,178 @@ public static void init() {
}
 
@Test
-   public void testNonExistingJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testNonExistingJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+   ProgramOptions options = mock(ProgramOptions.class);
+   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (FileNotFoundException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (FileNotFoundException e) {
+   // that's what we want
}
}
 
@Test
-   public void testFileNotJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testFileNotJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+   ProgramOptions options = mock(ProgramOptions.class);
+   when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (ProgramInvocationException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (ProgramInvocationException e) {
+   // that's what we want
}
}
 
@Test
-   public void testVariantWithExplicitJarAndArgumentsOption() {
-   try {
-   String[] arguments = {
-   "--classpath", "file:///tmp/foo",
-   "--classpath", "file:///tmp/bar",
-   "-j", getTestJarPath(),
-   "-a", "--debug", "true", "arg1", "arg2" 
};
-   URL[] classpath = new URL[] { new 
URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
-   String[] reducedArguments = new String[] {"--debug", 
"true", "arg1", "arg2"};
-
-   RunOptions options = 
CliFrontendParser.parseRunCommand(arguments);
-   assertEquals(getTestJarPath(), 
options.getJarFilePath());
-   assertArrayEquals(classpath, 
options.getClasspaths().toArray());
-   assertArrayEquals(reducedArguments, 
options.getProgramArgs());
-
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),

[jira] [Commented] (FLINK-8340) Do not pass Configuration and configuration directory to CustomCommandLine methods

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313166#comment-16313166
 ] 

ASF GitHub Bot commented on FLINK-8340:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5226#discussion_r159883585
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
 ---
@@ -60,220 +59,178 @@ public static void init() {
}
 
@Test
-   public void testNonExistingJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testNonExistingJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+   ProgramOptions options = mock(ProgramOptions.class);
+   
when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (FileNotFoundException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (FileNotFoundException e) {
+   // that's what we want
}
}
 
@Test
-   public void testFileNotJarFile() {
-   try {
-   CliFrontend frontend = new CliFrontend(
-   new Configuration(),
-   Collections.singletonList(new DefaultCLI()),
-   CliFrontendTestUtils.getConfigDir());
+   public void testFileNotJarFile() throws Exception {
+   Configuration configuration = new Configuration();
+   CliFrontend frontend = new CliFrontend(
+   configuration,
+   Collections.singletonList(new 
DefaultCLI(configuration)));
 
-   ProgramOptions options = mock(ProgramOptions.class);
-   
when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+   ProgramOptions options = mock(ProgramOptions.class);
+   when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
 
-   try {
-   frontend.buildProgram(options);
-   fail("should throw an exception");
-   }
-   catch (ProgramInvocationException e) {
-   // that's what we want
-   }
+   try {
+   frontend.buildProgram(options);
+   fail("should throw an exception");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (ProgramInvocationException e) {
+   // that's what we want
}
}
 
@Test
-   public void testVariantWithExplicitJarAndArgumentsOption() {
-   try {
-   String[] arguments = {
-   "--classpath", "file:///tmp/foo",
-   "--classpath", "file:///tmp/bar",
-   "-j", getTestJarPath(),
-   "-a", "--debug", "true", "arg1", "arg2" 
};
-   URL[] classpath = new URL[] { new 
URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
-   String[] reducedArguments = new String[] {"--debug", 
"true", "arg1", "arg2"};
-
-   RunOptions options = 
CliFrontendParser.parseRunCommand(arguments);
-   assertEquals(getTestJarPath(), 
options.getJarFilePath());
-   assertArrayEquals(classpath, 
options.getClasspaths().toArray());
-   assertA

[jira] [Updated] (FLINK-8376) Remove potentially unnecessary synchronization in SpillableSubpartition

2018-01-05 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8376:
--
Description: After https://issues.apache.org/jira/browse/FLINK-8375 
{{SpillableSubpartition}} uses two different locks ({{this}} and {{buffers}}). 
Probably they could be unified into one lock.  (was: After 
https://issues.apache.org/jira/browse/FLINK-8375 SpillableSubpartition uses two 
different locks (this and buffers). Probably they could be unified into one 
lock.)

> Remove potentially unnecessary synchronization in SpillableSubpartition
> ---
>
> Key: FLINK-8376
> URL: https://issues.apache.org/jira/browse/FLINK-8376
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>
> After https://issues.apache.org/jira/browse/FLINK-8375 
> {{SpillableSubpartition}} uses two different locks ({{this}} and 
> {{buffers}}). Probably they could be unified into one lock.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8375) Remove unnecessary synchronization in ResultPartition

2018-01-05 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8375:
--
Description: {{PipelinedSubpartition}} doesn't need this extra 
synchronization in {{ResultPartition}} so the locking from {{ResultPartition}} 
can be safely pushed down to {{SpillableSubpartition}}.  (was: 
{{PipelinedSubpartition}} doesn't need this extra synchronization in 
{{ResultPartition}} so the locking from {ResultPartition} can be safely pushed 
down to {SpillableSubpartition}.)

> Remove unnecessary synchronization in ResultPartition
> -
>
> Key: FLINK-8375
> URL: https://issues.apache.org/jira/browse/FLINK-8375
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> {{PipelinedSubpartition}} doesn't need this extra synchronization in 
> {{ResultPartition}} so the locking from {{ResultPartition}} can be safely 
> pushed down to {{SpillableSubpartition}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8376) Remove potentially unnecessary synchronization in SpillableSubpartition

2018-01-05 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8376:
-

 Summary: Remove potentially unnecessary synchronization in 
SpillableSubpartition
 Key: FLINK-8376
 URL: https://issues.apache.org/jira/browse/FLINK-8376
 Project: Flink
  Issue Type: Improvement
  Components: Network
Affects Versions: 1.4.0
Reporter: Piotr Nowojski


After https://issues.apache.org/jira/browse/FLINK-8375 SpillableSubpartition 
uses two different locks (this and buffers). Probably they could be unified 
into one lock.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8375) Remove unnecessary synchronization in ResultPartition

2018-01-05 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8375:
--
Description: {{PipelinedSubpartition}} doesn't need this extra 
synchronization in {{ResultPartition}} so the locking from {ResultPartition} 
can be safely pushed down to {SpillableSubpartition}.  (was: 
{PipelinedSubpartition} doesn't need this extra synchronization in 
{ResultPartition} so the locking from {ResultPartition} can be safely pushed 
down to {SpillableSubpartition}.)

> Remove unnecessary synchronization in ResultPartition
> -
>
> Key: FLINK-8375
> URL: https://issues.apache.org/jira/browse/FLINK-8375
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.5.0
>
>
> {{PipelinedSubpartition}} doesn't need this extra synchronization in 
> {{ResultPartition}} so the locking from {ResultPartition} can be safely 
> pushed down to {SpillableSubpartition}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313164#comment-16313164
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159883287
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
 
-   if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
+   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
--- End diff --

Yes, that is what it is used for. In case of cancelation, we want all 
blocking ops to terminate asap using this mechanism.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159883287
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
 
-   if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
+   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
--- End diff --

Yes, that is what it is used for. In case of cancelation, we want all 
blocking ops to terminate asap using this mechanism.


---


[jira] [Created] (FLINK-8375) Remove unnecessary synchronization in ResultPartition

2018-01-05 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8375:
-

 Summary: Remove unnecessary synchronization in ResultPartition
 Key: FLINK-8375
 URL: https://issues.apache.org/jira/browse/FLINK-8375
 Project: Flink
  Issue Type: Improvement
  Components: Network
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.5.0


{PipelinedSubpartition} doesn't need this extra synchronization in 
{ResultPartition} so the locking from {ResultPartition} can be safely pushed 
down to {SpillableSubpartition}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5246: Various minor improvements to FileSystem and relat...

2018-01-05 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5246

Various minor improvements to FileSystem and related classes

## What is the purpose of the change

Many small improvements, like
  - harmonization of of `FileSystem.mkdirs()` behavior (with test suite)
  - avoiding repeated re-parsing of URIs
  - avoiding repeated regex compilation
  - removal of unneeded and unused methods

## Verifying this change

The change is partially trivial rework and adds some additional unit tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink various_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5246


commit db52d6b22058d106e0bce57a646d68a64adbe5f4
Author: Stephan Ewen 
Date:   2017-10-26T18:54:55Z

[hotfix] [checkpoints] Remove never used method 'close()' on 
CheckpointStreamFactory

The fact that the method was never called (and never implemented) strongly 
suggests
that it should be removed, otherwise someone might eventually end up 
implementing
it for a new state backend and wonder why it is never called.

commit 40524a6cd9ae4db92bd17cf25af6178487d2921d
Author: Stephan Ewen 
Date:   2017-10-27T17:23:51Z

[hotfix] [core] Improve local fs exists() performance

This avoids going though an exception in the case of non-existing files.

commit c147967dc1aed07495fb7a7fb834d11e38eeae1d
Author: Stephan Ewen 
Date:   2017-10-27T17:25:22Z

[hotfix] [hdfs] Avoid re-parsing URIs for all Hadoop File System calls.

Previously, this converted Flink paths (internally URIs) to strings and
then let the Hadoop Paths parse, validate, and normalize the strings to
URIs again.

Now we simply pass the URIs directly.

commit 3771db1d6eef8079796229deb53a6a42f7def33e
Author: Stephan Ewen 
Date:   2017-12-06T13:51:22Z

[hotfix] [checkpoints] Improve performance of ByteStreamStateHandle

The input stream from ByteStreamStateHandle did not overwrite the 
'read(byte[], int, int)' method,
meaning that bulk byte reads resulted in many individual byte accesses.

Additionally, this change avoids accessing the data array through an outer 
class, but instead adds
a reference directly to the input stream class, avoiding one hop per 
access. That also allows
a more restricted access level on the fields, which may additionally help 
the jitter in some cases.

commit 355fa00b956b6717ab7ef9350cd59154a85b4091
Author: Stephan Ewen 
Date:   2017-12-06T14:10:22Z

[hotfix] [core] Avoid redundant File path conversion in 
LocalFileSystem.getFileStatus(Path)

commit 4f35cf3774e0879c98d61e251283ae955cdc66c0
Author: Stephan Ewen 
Date:   2017-12-07T15:11:24Z

[FLINK-8373] [core, hdfs] Ensure consistent semantics of 
FileSystem.mkdirs() across file system implementations.

commit 9cab78ac5c39334aacbb09879d3e1ebf99414185
Author: Stephan Ewen 
Date:   2017-12-13T14:07:52Z

[hotfix] [core] Pre-compile regex pattern in Path class

commit e8cf409358e81690d4bc66e7497aaad7576a148b
Author: Stephan Ewen 
Date:   2018-01-05T13:11:58Z

[hotfix] [tests] Remove unnecessary stack trace printing in StreamTaskTest

commit 08dfba993f9eaf64fa44c497a0925e9e494ecaa0
Author: Stephan Ewen 
Date:   2017-12-13T16:06:37Z

[hotfix] [core] Add a factory method to create Path from local file

This makes it easier for users and contributors to figure out how
to create local file paths in way that works cross operating systems.




---


[jira] [Commented] (FLINK-8359) Update copyright date in NOTICE

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313158#comment-16313158
 ] 

ASF GitHub Bot commented on FLINK-8359:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5238
  
Good fix, merging this...


> Update copyright date in NOTICE
> ---
>
> Key: FLINK-8359
> URL: https://issues.apache.org/jira/browse/FLINK-8359
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> NOTICE file has copyright year as 2014-2017. This needs to be updated as 
> 2014-2018.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5238: [FLINK-8359][docs] Update copyright date in NOTICE

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5238
  
Good fix, merging this...


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159866220
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -1483,11 +1482,6 @@ long getLastReportedBytesBufferedInAlignment() {
return lastReportedBytesBufferedInAlignment;
}
 
-   @Override
--- End diff --

I think you could replace `ValidatingCheckpointHandler` with 
`CheckpointSequenceValidator` and remove this code duplication.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313144#comment-16313144
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159860788
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1041,7 +1045,13 @@ public void restore(Collection 
restoreState) throws Exception
 
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
+
+   if (!enableIncrementalCheckpointing) {
--- End diff --

?

1. Is this change relevant to the commit? If not please extract it to 
separate one.
2. Why we do not need to handle notify for disabled incremental 
checkpointing? Doesn't it deserve some explanation in form of a comment?
  


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313147#comment-16313147
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159861671
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -132,21 +135,15 @@ public String getKey(String value) throws Exception {
final OneShotLatch delayCheckpointLatch = new OneShotLatch();
final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
 
-   StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-   testHarness.jobConfig,
-   testHarness.taskConfig,
-   testHarness.memorySize,
-   new MockInputSplitProvider(),
-   testHarness.bufferSize) {
+   CheckpointResponder checkpointResponderMock = new 
CheckpointResponder() {
--- End diff --

Is this change relevant to others?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313142#comment-16313142
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159840075
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
--- End diff --

Is it unrelated refactor/clean up? If so please move to separate commit. 
(ditto for other `xyz != null` checks removal)
  


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865703
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -669,31 +676,14 @@ else if (current == ExecutionState.CANCELING) {
Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, 
taskInfo,
jobConfiguration, taskConfiguration, 
userCodeClassLoader,
-   memoryManager, ioManager, 
broadcastVariableManager,
+   memoryManager, ioManager, 
broadcastVariableManager, taskStateManager,
--- End diff --

please reformat this call


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313154#comment-16313154
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159880527
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -147,7 +147,7 @@ public void close() throws IOException {
 
@Override
public void dispose() {
-   IOUtils.closeQuietly(this);
+   IOUtils.closeQuietly(closeStreamOnCancelRegistry);
--- End diff --

Another irrelevant change?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313141#comment-16313141
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159836477
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -145,8 +145,8 @@
 
private volatile Throwable failureCause;  // once assigned, 
never changes
 
-   /** The handle to the state that the task gets on restore */
-   private volatile TaskStateSnapshot taskState;
+   /** Information to restore the task onr recovery, such as checkpoint id 
and task state snapshot */
+   private volatile TaskRestore taskRestore;
--- End diff --

I would add `SerializableOptional` class, make this field 
`SerializableOptional` and pass it down to `Task` to avoid nulls. 

If you do not like this, please at least add `@Nullable` annotations to 
this field and subsequent usages.

Or maybe embed `isEmpty` logic into `TaskRestore`.
  


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865414
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
 ---
@@ -20,15 +20,18 @@
 
 import java.io.Serializable;
 
-public class TaskRestore implements Serializable {
+/**
+ * This class encapsulates the data from the job manager to restore a task.
+ */
+public class JobManagerTaskRestore implements Serializable {
--- End diff --

Please squash the rename with previous commit. 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313145#comment-16313145
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159840618
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
 
-   if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
+   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
--- End diff --

Side note, I do not understand why there is a `try` `finally` block with 
`close()` AND additional `closeableRegistry`. Is `closeableRegistry` used for 
artificially interrupting reads/writes?

Anyway, could you add some comment explaining this to the 
`closeableRegistry` field declaration? 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313153#comment-16313153
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159877503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateManager.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * This is the interface through which stream task expose a {@link 
StreamOperatorStateContext} to their operators.
+ * Operators, in turn, can use the context to initialize everything 
connected to their state, such as backends or
+ * a timer service manager.
+ */
+public interface StreamTaskStateManager {
--- End diff --

As we discussed, you could rename this class to 
`StreamTaskStateInitializer` and limit it's life scope.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313148#comment-16313148
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159866220
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
@@ -1483,11 +1482,6 @@ long getLastReportedBytesBufferedInAlignment() {
return lastReportedBytesBufferedInAlignment;
}
 
-   @Override
--- End diff --

I think you could replace `ValidatingCheckpointHandler` with 
`CheckpointSequenceValidator` and remove this code duplication.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159880527
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -147,7 +147,7 @@ public void close() throws IOException {
 
@Override
public void dispose() {
-   IOUtils.closeQuietly(this);
+   IOUtils.closeQuietly(closeStreamOnCancelRegistry);
--- End diff --

Another irrelevant change?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313157#comment-16313157
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159880011
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 ---
@@ -43,9 +44,9 @@ CheckpointStateOutputStream 
createCheckpointStateOutputStream(
 * Closes the stream factory, releasing all internal resources, but 
does not delete any
 * persistent checkpoint data.
 *
-* @throws Exception Exceptions can be forwarded and will be logged by 
the system
+* @throws IOException Exceptions can be forwarded and will be logged 
by the system
 */
-   void close() throws Exception;
+   void close() throws IOException;
--- End diff --

What's this change for? Is it used anywhere? It seems to me like `close()` 
is just a dead code and this method could be dropped, or did I missed something?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313143#comment-16313143
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865414
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
 ---
@@ -20,15 +20,18 @@
 
 import java.io.Serializable;
 
-public class TaskRestore implements Serializable {
+/**
+ * This class encapsulates the data from the job manager to restore a task.
+ */
+public class JobManagerTaskRestore implements Serializable {
--- End diff --

Please squash the rename with previous commit. 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159861671
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -132,21 +135,15 @@ public String getKey(String value) throws Exception {
final OneShotLatch delayCheckpointLatch = new OneShotLatch();
final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
 
-   StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-   testHarness.jobConfig,
-   testHarness.taskConfig,
-   testHarness.memorySize,
-   new MockInputSplitProvider(),
-   testHarness.bufferSize) {
+   CheckpointResponder checkpointResponderMock = new 
CheckpointResponder() {
--- End diff --

Is this change relevant to others?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313140#comment-16313140
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159834127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -145,8 +145,8 @@
 
private volatile Throwable failureCause;  // once assigned, 
never changes
 
-   /** The handle to the state that the task gets on restore */
-   private volatile TaskStateSnapshot taskState;
+   /** Information to restore the task onr recovery, such as checkpoint id 
and task state snapshot */
--- End diff --

`onr` typo


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159877503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateManager.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * This is the interface through which stream task expose a {@link 
StreamOperatorStateContext} to their operators.
+ * Operators, in turn, can use the context to initialize everything 
connected to their state, such as backends or
+ * a timer service manager.
+ */
+public interface StreamTaskStateManager {
--- End diff --

As we discussed, you could rename this class to 
`StreamTaskStateInitializer` and limit it's life scope.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159860788
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1041,7 +1045,13 @@ public void restore(Collection 
restoreState) throws Exception
 
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
+
+   if (!enableIncrementalCheckpointing) {
--- End diff --

?

1. Is this change relevant to the commit? If not please extract it to 
separate one.
2. Why we do not need to handle notify for disabled incremental 
checkpointing? Doesn't it deserve some explanation in form of a comment?
  


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313155#comment-16313155
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159881015
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 ---
@@ -1,104 +0,0 @@
-/*
--- End diff --

That was a dead code, right? Please extract removal of this (and 
`MultiStreamStateHandleTest`) to separate commit.
  


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159840075
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
--- End diff --

Is it unrelated refactor/clean up? If so please move to separate commit. 
(ditto for other `xyz != null` checks removal)
  


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313146#comment-16313146
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159863616
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 ---
@@ -375,11 +376,13 @@ public void declareOutputFields(OutputFieldsDeclarer 
declarer) {
when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
when(env.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
 
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
StreamTask mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new 
StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(execConfig);
+   when(mockTask.getCancelables()).thenReturn(closeableRegistry);
--- End diff --

Mockito as always!

Why not replace this ugly block with `NoOpStreamTask` or something else 
implementing `StreamTask`? It will save the hassle for next developer that will 
try to call some other method from this mock, that wasn't defined properly.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159880011
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 ---
@@ -43,9 +44,9 @@ CheckpointStateOutputStream 
createCheckpointStateOutputStream(
 * Closes the stream factory, releasing all internal resources, but 
does not delete any
 * persistent checkpoint data.
 *
-* @throws Exception Exceptions can be forwarded and will be logged by 
the system
+* @throws IOException Exceptions can be forwarded and will be logged 
by the system
 */
-   void close() throws Exception;
+   void close() throws IOException;
--- End diff --

What's this change for? Is it used anywhere? It seems to me like `close()` 
is just a dead code and this method could be dropped, or did I missed something?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865180
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java ---
@@ -18,11 +18,38 @@
 
 package org.apache.flink.util;
 
-import org.apache.flink.annotation.Internal;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
- * Tagging interface for migration related classes.
+ * This interface represents an iterator that is also closeable.
+ *
+ * @param  type of the iterated objects.
  */
-@Internal
-public interface Migration {
+public interface CloseableIterator extends Iterator, Closeable {
--- End diff --

I can not find usages of this in this commit. Is it used here somehow? If 
not, please remove it/extract to separate commit/move to appropriate commit.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159877880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
 ---
@@ -18,11 +18,14 @@
 package org.apache.flink.runtime.state;
 
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * This interface must be implemented by functions/operations that want to 
receive
  * a commit notification once a checkpoint has been completely 
acknowledged by all
  * participants.
  */
+@PublicEvolving
--- End diff --

Separate commit?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865241
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/CloseableIterable.java ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This interface represents an iterable that is also closeable.
+ *
+ * @param  type of the iterated objects.
+ */
+public interface CloseableIterable extends Iterable, Closeable {
--- End diff --

Couldn't you use `Stream` for that? 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159866648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -669,31 +676,14 @@ else if (current == ExecutionState.CANCELING) {
Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, 
taskInfo,
jobConfiguration, taskConfiguration, 
userCodeClassLoader,
-   memoryManager, ioManager, 
broadcastVariableManager,
+   memoryManager, ioManager, 
broadcastVariableManager, taskStateManager,
accumulatorRegistry, kvStateRegistry, 
inputSplitProvider,
distributedCacheEntries, writers, inputGates,
checkpointResponder, taskManagerConfig, 
metrics, this);
 
// let the task code create its readers and writers
invokable.setEnvironment(env);
 
-   // the very last thing before the actual execution 
starts running is to inject
-   // the state into the task. the state is non-empty if 
this is an execution
-   // of a task that failed but had backuped state from a 
checkpoint
-
-   if (null != taskRestore && 
taskRestore.getTaskStateSnapshot() != null) {
-   if (invokable instanceof StatefulTask) {
-   StatefulTask op = (StatefulTask) 
invokable;
-   
op.setInitialState(taskRestore.getTaskStateSnapshot());
--- End diff --

grrr, this cryptic `op` name forced me to look into the source code to 
check whether this is is instance of `StatefullTask` or not :/ could you rename 
it to something that at least is not an abbreviation?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159863616
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 ---
@@ -375,11 +376,13 @@ public void declareOutputFields(OutputFieldsDeclarer 
declarer) {
when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
when(env.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
 
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
StreamTask mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new 
StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(execConfig);
+   when(mockTask.getCancelables()).thenReturn(closeableRegistry);
--- End diff --

Mockito as always!

Why not replace this ugly block with `NoOpStreamTask` or something else 
implementing `StreamTask`? It will save the hassle for next developer that will 
try to call some other method from this mock, that wasn't defined properly.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159881015
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 ---
@@ -1,104 +0,0 @@
-/*
--- End diff --

That was a dead code, right? Please extract removal of this (and 
`MultiStreamStateHandleTest`) to separate commit.
  


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313156#comment-16313156
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865703
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -669,31 +676,14 @@ else if (current == ExecutionState.CANCELING) {
Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, 
taskInfo,
jobConfiguration, taskConfiguration, 
userCodeClassLoader,
-   memoryManager, ioManager, 
broadcastVariableManager,
+   memoryManager, ioManager, 
broadcastVariableManager, taskStateManager,
--- End diff --

please reformat this call


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313149#comment-16313149
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159866648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -669,31 +676,14 @@ else if (current == ExecutionState.CANCELING) {
Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, 
taskInfo,
jobConfiguration, taskConfiguration, 
userCodeClassLoader,
-   memoryManager, ioManager, 
broadcastVariableManager,
+   memoryManager, ioManager, 
broadcastVariableManager, taskStateManager,
accumulatorRegistry, kvStateRegistry, 
inputSplitProvider,
distributedCacheEntries, writers, inputGates,
checkpointResponder, taskManagerConfig, 
metrics, this);
 
// let the task code create its readers and writers
invokable.setEnvironment(env);
 
-   // the very last thing before the actual execution 
starts running is to inject
-   // the state into the task. the state is non-empty if 
this is an execution
-   // of a task that failed but had backuped state from a 
checkpoint
-
-   if (null != taskRestore && 
taskRestore.getTaskStateSnapshot() != null) {
-   if (invokable instanceof StatefulTask) {
-   StatefulTask op = (StatefulTask) 
invokable;
-   
op.setInitialState(taskRestore.getTaskStateSnapshot());
--- End diff --

grrr, this cryptic `op` name forced me to look into the source code to 
check whether this is is instance of `StatefullTask` or not :/ could you rename 
it to something that at least is not an abbreviation?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313151#comment-16313151
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865180
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java ---
@@ -18,11 +18,38 @@
 
 package org.apache.flink.util;
 
-import org.apache.flink.annotation.Internal;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
- * Tagging interface for migration related classes.
+ * This interface represents an iterator that is also closeable.
+ *
+ * @param  type of the iterated objects.
  */
-@Internal
-public interface Migration {
+public interface CloseableIterator extends Iterator, Closeable {
--- End diff --

I can not find usages of this in this commit. Is it used here somehow? If 
not, please remove it/extract to separate commit/move to appropriate commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313152#comment-16313152
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159877880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
 ---
@@ -18,11 +18,14 @@
 package org.apache.flink.runtime.state;
 
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * This interface must be implemented by functions/operations that want to 
receive
  * a commit notification once a checkpoint has been completely 
acknowledged by all
  * participants.
  */
+@PublicEvolving
--- End diff --

Separate commit?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313150#comment-16313150
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159865241
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/CloseableIterable.java ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This interface represents an iterable that is also closeable.
+ *
+ * @param  type of the iterated objects.
+ */
+public interface CloseableIterable extends Iterable, Closeable {
--- End diff --

Couldn't you use `Stream` for that? 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159836477
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -145,8 +145,8 @@
 
private volatile Throwable failureCause;  // once assigned, 
never changes
 
-   /** The handle to the state that the task gets on restore */
-   private volatile TaskStateSnapshot taskState;
+   /** Information to restore the task onr recovery, such as checkpoint id 
and task state snapshot */
+   private volatile TaskRestore taskRestore;
--- End diff --

I would add `SerializableOptional` class, make this field 
`SerializableOptional` and pass it down to `Task` to avoid nulls. 

If you do not like this, please at least add `@Nullable` annotations to 
this field and subsequent usages.

Or maybe embed `isEmpty` logic into `TaskRestore`.
  


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159840618
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path 
filePath) throws Exception {
return result;
 
} finally {
-   if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
+
+   if 
(closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
 
-   if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
+   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
--- End diff --

Side note, I do not understand why there is a `try` `finally` block with 
`close()` AND additional `closeableRegistry`. Is `closeableRegistry` used for 
artificially interrupting reads/writes?

Anyway, could you add some comment explaining this to the 
`closeableRegistry` field declaration? 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-01-05 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r159834127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -145,8 +145,8 @@
 
private volatile Throwable failureCause;  // once assigned, 
never changes
 
-   /** The handle to the state that the task gets on restore */
-   private volatile TaskStateSnapshot taskState;
+   /** Information to restore the task onr recovery, such as checkpoint id 
and task state snapshot */
--- End diff --

`onr` typo


---


[GitHub] flink issue #5242: [hotfix] Fix typos

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5242
  
Wow, this is pretty good! Thanks a lot!

Addressing Fabian's comment and merging this. Want that merged very soon to 
avoid future merge conflicts...


---


[jira] [Commented] (FLINK-8362) Shade Elasticsearch dependencies away

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313125#comment-16313125
 ] 

ASF GitHub Bot commented on FLINK-8362:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5243
  
I vaguely recall that there was a reason why we did not shade the 
Elasticsearch connectors initially. Maybe a license issue. @zentol and 
@tzulitai can you chime in here?

If we want to go ahead with this, few comments:
  - Not sure we should relocate the Flink elasticsearch code. Why is that 
needed?
  - Would be good to add for each dependency that is shaded should have a 
comment about the license. Some licenses are not compatible with shading.
  - Some licenses may need an additional license file or a mention in an 
extra bundled NOTICE file, see the `flink-s3-fs-presto`
  - In general, we cannot suppress all forwarded files from `META-INF` for 
dependencies, especiallylicense related files.


> Shade Elasticsearch dependencies away
> -
>
> Key: FLINK-8362
> URL: https://issues.apache.org/jira/browse/FLINK-8362
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> It would be nice to make the Elasticsearch connectors self-contained just 
> like the s3 file system implementations and the cassandra connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5243: [FLINK-8362][elasticsearch] shade all dependencies

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5243
  
I vaguely recall that there was a reason why we did not shade the 
Elasticsearch connectors initially. Maybe a license issue. @zentol and 
@tzulitai can you chime in here?

If we want to go ahead with this, few comments:
  - Not sure we should relocate the Flink elasticsearch code. Why is that 
needed?
  - Would be good to add for each dependency that is shaded should have a 
comment about the license. Some licenses are not compatible with shading.
  - Some licenses may need an additional license file or a mention in an 
extra bundled NOTICE file, see the `flink-s3-fs-presto`
  - In general, we cannot suppress all forwarded files from `META-INF` for 
dependencies, especiallylicense related files.


---


[jira] [Commented] (FLINK-8322) support getting number of existing timers in TimerService

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313116#comment-16313116
 ] 

ASF GitHub Bot commented on FLINK-8322:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5236
  
I am with Aljoscha here - the cost of making this information available in 
the general case seems pretty high.

What is the use case that needs that feature - can that be implemented in a 
different way in the user application when needed?


> support getting number of existing timers in TimerService
> -
>
> Key: FLINK-8322
> URL: https://issues.apache.org/jira/browse/FLINK-8322
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> There are pretty common use cases where users want to use timers as scheduled 
> threads - e.g. add a timer to wake up x hours later and do something (reap 
> old data usually) only if there's no existing timers, basically we only want 
> at most 1 timer exists for the key all the time



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5237: [hotfix][doc] fix typo in filesystems.md

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5237
  
Thanks, looks good, merging this...


---


[GitHub] flink issue #5236: [FLINK-8322] support getting number of existing timers in...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5236
  
I am with Aljoscha here - the cost of making this information available in 
the general case seems pretty high.

What is the use case that needs that feature - can that be implemented in a 
different way in the user application when needed?


---


[jira] [Commented] (FLINK-6206) As an Engineer, I want task state transition log to be warn/error for FAILURE scenarios

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313109#comment-16313109
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5235
  
Can you elaborate why you want to change this?

This seems like a perfect regular information, not a case for a warning. 
Warnings indicate that something is potentially not okay.


> As an Engineer, I want task state transition log to be warn/error for FAILURE 
> scenarios
> ---
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5235: [FLINK-6206] [runtime] Changed log level to WARN for task...

2018-01-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5235
  
Can you elaborate why you want to change this?

This seems like a perfect regular information, not a case for a warning. 
Warnings indicate that something is potentially not okay.


---


<    1   2   3   >