[jira] [Comment Edited] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769
 ] 

BoWang edited comment on FLINK-11344 at 1/17/19 7:55 AM:
-

[~hailong wang] Thanks for the comment.

The execution attempt list indeed becomes a bit longer if failures occur, but I 
think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.


was (Author: eaglewatcher):
[~hailong wang] Thanks for the comment.

The execution attempt list will indeed becomes a bit longer if failures occur, 
but I think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread BoWang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744769#comment-16744769
 ] 

BoWang commented on FLINK-11344:


[~hailong wang] Thanks for the comment.

The execution attempt list will indeed becomes a bit longer if failures occur, 
but I think keeps restarting is rare, so a bit longer list may be tolerable.

Displaying failed attempt could give us a glimpse of the running information 
without query the log, including running time, on which TaskManager it runs 
etc. According these simple informations, we could make some actions, e.g., by 
*orderBy host* on dash board, we can also find the bad host on which many 
attempts fail to make corresponding operation actions, e.g., add the machine to 
blacklist.

Thus, I think displaying all the execution attempts is more good than harm.

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248523663
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
 jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
   }
 
+  @Test
+  def testAddTableFromUpsert(): Unit = {
 
 Review comment:
   Used to check `registerTable()` with `field.key`. Considering your comments 
below, I will revert changes in the file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2019-01-16 Thread Tarush Grover (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744746#comment-16744746
 ] 

Tarush Grover commented on FLINK-10429:
---

Whats the current status for this ticket, like which tasks are still open? I 
can pick some of them if they are still open :)

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)

2019-01-16 Thread shengjk1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shengjk1 updated FLINK-11376:
-
Attachment: Main222.java

> flink cli  -yn -ys is not  effect if  (yn * ys) parallelism form env.setParallelism(parallelism) ) ;
> -
>
> Key: FLINK-11376
> URL: https://issues.apache.org/jira/browse/FLINK-11376
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.1
> Environment: java: jdk1.8.0_151
> flink: flink-1.7.1
> CDH:CDH-5.13.1-1.cdh5.13.1.p0.2
>Reporter: shengjk1
>Priority: Major
> Attachments: Main222.java, image-2019-01-17-14-25-34-206.png
>
>
> Such as the title
> if  (yn * ys) env.setParallelism(parallelism) ) the yn and ys is not effect
> my application is flink streaming read kafka . this kafka topic has 3 
> partition,and setParallelism(3) in code.when i use cli submiitjobs
> flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test 
> -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar
> the application  apply for 4 cpu cores and 4 containers from yarn web ui
> !image-2019-01-17-14-25-34-206.png!
> but if code is not write env.setParallelism(parallelism) or 
> (yn*ys)>parallelism ,the  yn、ys will effect. if code write   
> env.setParallelism(parallelism) ,the final application resources are yn 
> multiples and ys  multiples. such  as parallelism=10,yn=1 ys=5,the final 
> application resources:cpu cores=11 containers=3
>  
> Reproduce for the convenience of bugs,offer codes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)

2019-01-16 Thread shengjk1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shengjk1 updated FLINK-11376:
-
Attachment: (was: Main222.java)

> flink cli  -yn -ys is not  effect if  (yn * ys) parallelism form env.setParallelism(parallelism) ) ;
> -
>
> Key: FLINK-11376
> URL: https://issues.apache.org/jira/browse/FLINK-11376
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.1
> Environment: java: jdk1.8.0_151
> flink: flink-1.7.1
> CDH:CDH-5.13.1-1.cdh5.13.1.p0.2
>Reporter: shengjk1
>Priority: Major
> Attachments: image-2019-01-17-14-25-34-206.png
>
>
> Such as the title
> if  (yn * ys) env.setParallelism(parallelism) ) the yn and ys is not effect
> my application is flink streaming read kafka . this kafka topic has 3 
> partition,and setParallelism(3) in code.when i use cli submiitjobs
> flink-1.7.1/bin/flink run -m yarn-cluster -yn 1 -ys 1 -ynm test 
> -ccom.ishansong.bigdata.Main222 ./flinkDemo-1.0-SNAPSHOT.jar
> the application  apply for 4 cpu cores and 4 containers from yarn web ui
> !image-2019-01-17-14-25-34-206.png!
> but if code is not write env.setParallelism(parallelism) or 
> (yn*ys)>parallelism ,the  yn、ys will effect. if code write   
> env.setParallelism(parallelism) ,the final application resources are yn 
> multiples and ys  multiples. such  as parallelism=10,yn=1 ys=5,the final 
> application resources:cpu cores=11 containers=3
>  
> Reproduce for the convenience of bugs,offer codes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11377) AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn if cli -yn -ys not effect

2019-01-16 Thread shengjk1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shengjk1 updated FLINK-11377:
-
Description: 
when cli  -yn -ys not effect ,AbstractYarnClusterDescriptor's 
validClusterSpecification is not final application resources from yarn  (cli  
-yn -ys not effect can refer to 
https://issues.apache.org/jira/browse/FLINK-11376)

 

the cli :

flink-1.7.1/bin/flink  run -m yarn-cluster -yn 1 -ys 1  -ynm test     
-ccom.ishansong.bigdata.Main222  ./flinkDemo-1.0-SNAPSHOT.jar

AbstractYarnClusterDescriptor's log :

 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
specification: ClusterSpecification\{masterMemoryMB=1024, 
taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}

but yarn web ui:

allocated containers=4 and allocated cpu cores=4

!image-2019-01-17-14-57-24-060.png!

  was:
when cli  -yn -ys not effect ,AbstractYarnClusterDescriptor's 
validClusterSpecification is not final application resources from yarn  (cli  
-yn -ys not effect can refer to 
https://issues.apache.org/jira/browse/FLINK-11376)

 

the cli :

flink-1.7.1/bin/flink  run -m yarn-cluster -yn 1 -ys 1  -ynm test     
-ccom.ishansong.bigdata.Main222  ./flinkDemo-1.0-SNAPSHOT.jar

AbstractYarnClusterDescriptor's log :

 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
specification: ClusterSpecification\{masterMemoryMB=1024, 
taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}

but yarn web ui:

!image-2019-01-17-14-57-24-060.png!


> AbstractYarnClusterDescriptor's validClusterSpecification is not final 
> application resources from yarn  if   cli  -yn -ys not effect
> 
>
> Key: FLINK-11377
> URL: https://issues.apache.org/jira/browse/FLINK-11377
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.1
> Environment: java: jdk1.8.0_151
> flink: flink-1.7.1
> CDH:CDH-5.13.1-1.cdh5.13.1.p0.2
>Reporter: shengjk1
>Priority: Major
> Attachments: image-2019-01-17-14-57-24-060.png
>
>
> when cli  -yn -ys not effect ,AbstractYarnClusterDescriptor's 
> validClusterSpecification is not final application resources from yarn  (cli  
> -yn -ys not effect can refer to 
> https://issues.apache.org/jira/browse/FLINK-11376)
>  
> the cli :
> flink-1.7.1/bin/flink  run -m yarn-cluster -yn 1 -ys 1  -ynm test     
> -ccom.ishansong.bigdata.Main222  ./flinkDemo-1.0-SNAPSHOT.jar
> AbstractYarnClusterDescriptor's log :
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
> specification: ClusterSpecification\{masterMemoryMB=1024, 
> taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
> but yarn web ui:
> allocated containers=4 and allocated cpu cores=4
> !image-2019-01-17-14-57-24-060.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-455063600
 
 
   @pnowojski Thanks a lot for your comments. I have updated the PR according 
to your suggestions and rebased to the master. Looking forward to your new 
comments. Thank you!
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11376) flink cli -yn -ys is not effect if (yn * ys)

2019-01-16 Thread shengjk1 (JIRA)
shengjk1 created FLINK-11376:


 Summary: flink cli  -yn -ys is not  effect if  (yn * 
ys)https://issues.apache.org/jira/browse/FLINK-11376
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.1
 Environment: java: jdk1.8.0_151

flink: flink-1.7.1

CDH:CDH-5.13.1-1.cdh5.13.1.p0.2
Reporter: shengjk1
 Attachments: Main222.java, image-2019-01-17-14-25-34-206.png

Such as the title

if  (yn * ys)parallelism 
,the  yn、ys will effect. if code write   env.setParallelism(parallelism) ,the 
final application resources are yn multiples and ys  multiples. such  as 
parallelism=10,yn=1 ys=5,the final application resources:cpu cores=11 
containers=3

 

Reproduce for the convenience of bugs,offer codes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11377) AbstractYarnClusterDescriptor's validClusterSpecification is not final application resources from yarn if cli -yn -ys not effect

2019-01-16 Thread shengjk1 (JIRA)
shengjk1 created FLINK-11377:


 Summary: AbstractYarnClusterDescriptor's validClusterSpecification 
is not final application resources from yarn  if   cli  -yn -ys not effect
 Key: FLINK-11377
 URL: https://issues.apache.org/jira/browse/FLINK-11377
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.1
 Environment: java: jdk1.8.0_151

flink: flink-1.7.1

CDH:CDH-5.13.1-1.cdh5.13.1.p0.2
Reporter: shengjk1
 Attachments: image-2019-01-17-14-57-24-060.png

when cli  -yn -ys not effect ,AbstractYarnClusterDescriptor's 
validClusterSpecification is not final application resources from yarn  (cli  
-yn -ys not effect can refer to 
https://issues.apache.org/jira/browse/FLINK-11376)

 

the cli :

flink-1.7.1/bin/flink  run -m yarn-cluster -yn 1 -ys 1  -ynm test     
-ccom.ishansong.bigdata.Main222  ./flinkDemo-1.0-SNAPSHOT.jar

AbstractYarnClusterDescriptor's log :

 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster 
specification: ClusterSpecification\{masterMemoryMB=1024, 
taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}

but yarn web ui:

!image-2019-01-17-14-57-24-060.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase

2019-01-16 Thread GitBox
TisonKun commented on issue #7516: [FLINK-11360] [test] Check and remove 
LocalFlinkMiniClusterITCase
URL: https://github.com/apache/flink/pull/7516#issuecomment-455059869
 
 
   travis fails on
   >23:21:27.113 [INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 273.772 s - in 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
   23:21:27.524 [INFO] 
   23:21:27.524 [INFO] Results:
   23:21:27.524 [INFO] 
   23:21:27.524 [ERROR] Errors: 
   23:21:27.524 [ERROR]   
KafkaITCase.testCancelingEmptyTopic:85->KafkaConsumerTestBase.runCancelingOnEmptyInputTest:1124->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
 » TestTimedOut
   23:21:27.524 [INFO] 
   23:21:27.524 [ERROR] Tests run: 47, Failures: 0, Errors: 1, Skipped: 0
   
   I think it is irrelevant


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-16 Thread GitBox
eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for 
dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#issuecomment-455054189
 
 
   > Thanks for the contribution @eaglewatcherwb .
   > 
   > I don't think we should introduce incompatibilities with that change. 
Having two different versions of an interface is a problem in the first place. 
If it occurs the solution would be just to simple leave only one version of the 
proxy.
   > 
   > @eaglewatcherwb I don't think we should change `SocketWindowWordCount` 
example at all. I would prefer to see a proper unit test for those changes. You 
can check 
`org.apache.flink.runtime.classloading.ClassLoaderTest#testMessageDecodingWithUnavailableClass`
 for inspiration.
   
   @dawidwys Thanks for the comment.
   "leave only one version of the proxy." do you mean 
`ClassLoaderObjectInputStream.resolveProxyClass `not call 
`super.resolveProxyClass` and just return `null` when `classLoader==null` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248533068
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase {
 (jTEnv, ds)
   }
 
+  private def prepareKeyedSchemaExpressionParser:
+(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong]]]) = {
+
+val jStreamExecEnv = mock(classOf[JStreamExecEnv])
+
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
+
+val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, 
Types.INT, Types.LONG)
+  .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
+val dsType = new TupleTypeInfo(Types.BOOLEAN, sType)
+  .asInstanceOf[TupleTypeInfo[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong
+val ds = mock(classOf[DataStream[JTuple2[JBool, JTuple5[JLong, JInt, 
String, JInt, JLong)
+when(ds.getType).thenReturn(dsType)
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-16 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-11375:
-
Description: 
In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called throw 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
SourceConversion(table:[_DataStreamTable_12, source: [BlinkStoreScanTableSource 
feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
fields:(f0)) -> correlate: 
table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
select: 
item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
 (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to FINISHED.
2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices meet 
exception, need to fail global execution graph
java.lang.reflect.UndeclaredThrowableException
 at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
 at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
 at 
org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
 at 
org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
 at 
org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
 at java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: 
java.util.ConcurrentModificationException
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
 ... 23 more
Caused by: java.util.ConcurrentModificationException
 at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
 at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
 at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
 at 

[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248525458
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase {
 (jTEnv, ds)
   }
 
+  private def prepareKeyedSchemaExpressionParser:
+(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong]]]) = {
+
+val jStreamExecEnv = mock(classOf[JStreamExecEnv])
 
 Review comment:
   I will revert changes in this file. It not only introduce confusions raised 
by you, but also can be tested in our `FromUpsertStreamTest` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11375) Concurrent modification to slot pool due to SlotSharingManager releaseSlot directly

2019-01-16 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11375:


 Summary: Concurrent modification to slot pool due to 
SlotSharingManager releaseSlot directly 
 Key: FLINK-11375
 URL: https://issues.apache.org/jira/browse/FLINK-11375
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.7.1
Reporter: shuai.xu


In SlotPool, the AvailableSlots is lock free, so all access to it should in the 
main thread of SlotPool, and so all the public methods are called throw 
SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
This may cause a ConcurrentModificationException.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints

2019-01-16 Thread GitBox
Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] 
Introduce LZ4 compression for keyed state in full checkpoints and savepoints
URL: https://github.com/apache/flink/pull/7515#discussion_r248531370
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}
 
-   public boolean isUseSnapshotCompression() {
 
 Review comment:
   @zentol 
   hmm, to keep backward compatibility, I have to let `ExecutionConfig` within 
`flink-core` module to know the snappy compression type. It seems I have to 
also move all `StreamCompressionDecorator`s into `flink-core` module. I'll then 
update my PR based on this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints

2019-01-16 Thread GitBox
Myasuka commented on a change in pull request #7515: [FLINK-11313][checkpoint] 
Introduce LZ4 compression for keyed state in full checkpoints and savepoints
URL: https://github.com/apache/flink/pull/7515#discussion_r248531370
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}
 
-   public boolean isUseSnapshotCompression() {
 
 Review comment:
   hmm, to keep backward compatibility, I have to let `ExecutionConfig` within 
`flink-core` module to know the snappy compression type. It seems I have to 
also move all `StreamCompressionDecorator`s into `flink-core` module. I'll then 
update my PR based on this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248531053
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
 ##
 @@ -95,4 +95,41 @@ object StreamTestData {
 data.+=(((3, 3), "three"))
 env.fromCollection(data)
   }
+
+  def getSmall3TupleUpsertStream(env: StreamExecutionEnvironment):
+  DataStream[(Boolean, (Int, Long, String))] = {
+val data = new mutable.MutableList[(Boolean, (Int, Long, String))]
+data.+=((true, (1, 1L, "Hi")))
+data.+=((true, (2, 2L, "Hello")))
+data.+=((true, (3, 2L, "Hello world")))
+env.fromCollection(data)
+  }
+
+  def get3TupleUpsertStream(env: StreamExecutionEnvironment):
 
 Review comment:
   Oh, it should be used in the next commit. I will correct this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248530953
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -46,6 +46,20 @@ class TableSinkValidationTest extends TableTestBase {
 env.execute()
   }
 
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnUpdatingTable2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val t = tEnv.fromUpsertStream(
+  StreamTestData.getSmall3TupleUpsertStream(env), 'id.key, 'num, 'text)
+
+t.writeToSink(new TestAppendSink)
+
+// must fail because table is not append-only
+env.execute()
 
 Review comment:
   Good point! I think we can use `streamTestUtil` to test the sink logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11366) Check and port TaskManagerMetricsTest to new code base if necessary

2019-01-16 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-11366:


Assignee: Yun Tang

> Check and port TaskManagerMetricsTest to new code base if necessary
> ---
>
> Key: FLINK-11366
> URL: https://issues.apache.org/jira/browse/FLINK-11366
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Yun Tang
>Priority: Major
>
> Check and port {{TaskManagerMetricsTest}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11363) Check and remove TaskManagerConfigurationTest

2019-01-16 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-11363:


Assignee: Yun Tang

> Check and remove TaskManagerConfigurationTest
> -
>
> Key: FLINK-11363
> URL: https://issues.apache.org/jira/browse/FLINK-11363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Yun Tang
>Priority: Major
>
> Check whether {{TaskManagerConfigurationTest}} contains any relevant tests 
> for the new code base and then remove this test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11374) See more failover and can filter by time range

2019-01-16 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744657#comment-16744657
 ] 

lining commented on FLINK-11374:


Hi, [~till.rohrmann]. What do you think of this.

> See more failover and can filter by time range
> --
>
> Key: FLINK-11374
> URL: https://issues.apache.org/jira/browse/FLINK-11374
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Reporter: lining
>Assignee: lining
>Priority: Major
>
> Now failover just show limit size task failover latest time. If task has 
> failed many time, we can not see the earlier time failover. Can we add filter 
> by time to see failover which contains task attemp fail msg.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11374) See more failover and can filter by time range

2019-01-16 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-11374:
---
Summary: See more failover and can filter by time range  (was: Faiover add 
time range filter)

> See more failover and can filter by time range
> --
>
> Key: FLINK-11374
> URL: https://issues.apache.org/jira/browse/FLINK-11374
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Reporter: lining
>Assignee: lining
>Priority: Major
>
> Now failover just show limit size task failover latest time. If task has 
> failed many time, we can not see the earlier time failover. Can we add filter 
> by time to see failover which contains task attemp fail msg.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11374) Faiover add time range filter

2019-01-16 Thread lining (JIRA)
lining created FLINK-11374:
--

 Summary: Faiover add time range filter
 Key: FLINK-11374
 URL: https://issues.apache.org/jira/browse/FLINK-11374
 Project: Flink
  Issue Type: Improvement
  Components: REST, Webfrontend
Reporter: lining
Assignee: lining


Now failover just show limit size task failover latest time. If task has failed 
many time, we can not see the earlier time failover. Can we add filter by time 
to see failover which contains task attemp fail msg.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] libenchao opened a new pull request #7518: [hotfix] [docs] fix typo in debugging classloading doc

2019-01-16 Thread GitBox
libenchao opened a new pull request #7518: [hotfix] [docs] fix typo in 
debugging classloading doc
URL: https://github.com/apache/flink/pull/7518
 
 
   ## What is the purpose of the change
   
   Fix typo in debugging classloading doc.
   
   ## Brief change log
   
   Fix typo
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248527085
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT b as b1, c, proctime as proctime1, rowtime as rowtime1 
FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamUpsertToRetraction",
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  ),
+  term("keys", "b"),
+  term("select", "a", "b", "c", "proctime", "rowtime")
+),
+term("select", "b AS b1", "c", "proctime AS proctime1", "rowtime AS 
rowtime1"))
+streamUtil.verifySql(sql, expected, true)
+  }
+
+  @Test
+  def testCalcCannotTransposeUpsertToRetraction() = {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary

2019-01-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744645#comment-16744645
 ] 

TisonKun commented on FLINK-11370:
--

[~lining] if you start working on this issue, I'm glad to help review. 
Basically I've not started working on it. So it's OK for me that you take over 
it.

> Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
> --
>
> Key: FLINK-11370
> URL: https://issues.apache.org/jira/browse/FLINK-11370
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: TisonKun
>Priority: Major
>
> Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248526967
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   `testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose` is used to 
test materialize logic and transpose(calc can be pushed down through 
UpsertToRetraction).
   
   `testCalcCannotTransposeUpsertToRetraction` is used to test transpose 
logic(calc can not be pushed down).
   
   However, we should not test transpose logic in this commit? I will remove 
`testCalcCannotTransposeUpsertToRetraction` in this commit and add it later. 
What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary

2019-01-16 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining reassigned FLINK-11370:
--

Assignee: lining  (was: TisonKun)

> Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
> --
>
> Key: FLINK-11370
> URL: https://issues.apache.org/jira/browse/FLINK-11370
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: lining
>Priority: Major
>
> Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10882) Misleading job/task state for scheduled jobs

2019-01-16 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-10882:
---
Comment: was deleted

(was: [~Zentol] what's your point?)

> Misleading job/task state for scheduled jobs
> 
>
> Key: FLINK-10882
> URL: https://issues.apache.org/jira/browse/FLINK-10882
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
> Attachments: list_view.png, task_view.png
>
>
> When submitting a job when not enough resources are available currently 
> cuases the job  stay in a {{CREATE/SCHEDULED}} state.
> There are 2 issues with how this is presented in the UI.
> The {{Running Jobs}} page incorrectly states that the job is running.
> (see list_view attachment)
> EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
> state.
> The state display for individual tasks either
> # States the task is in a CREATED state, when it is actually SCHEDULED
> # States the task is in a CREATED state, but the count for all state boxes is 
> zero.
> (see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10882) Misleading job/task state for scheduled jobs

2019-01-16 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-10882:
---
Comment: was deleted

(was: See code, now org.apache.flink.runtime.jobgraph.JobStatus define running 
which means 

Some tasks are scheduled or running, some may be pending, some may be finished. 
Should we add new status?)

> Misleading job/task state for scheduled jobs
> 
>
> Key: FLINK-10882
> URL: https://issues.apache.org/jira/browse/FLINK-10882
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
> Attachments: list_view.png, task_view.png
>
>
> When submitting a job when not enough resources are available currently 
> cuases the job  stay in a {{CREATE/SCHEDULED}} state.
> There are 2 issues with how this is presented in the UI.
> The {{Running Jobs}} page incorrectly states that the job is running.
> (see list_view attachment)
> EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
> state.
> The state display for individual tasks either
> # States the task is in a CREATED state, when it is actually SCHEDULED
> # States the task is in a CREATED state, but the count for all state boxes is 
> zero.
> (see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary

2019-01-16 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining reassigned FLINK-11370:
--

Assignee: TisonKun  (was: lining)

> Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
> --
>
> Key: FLINK-11370
> URL: https://issues.apache.org/jira/browse/FLINK-11370
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: TisonKun
>Priority: Major
>
> Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248525458
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase {
 (jTEnv, ds)
   }
 
+  private def prepareKeyedSchemaExpressionParser:
+(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong]]]) = {
+
+val jStreamExecEnv = mock(classOf[JStreamExecEnv])
 
 Review comment:
   I will remove changes in this file. It not only introduce confusions raised 
by you, but also can be tested in our `FromUpsertStreamTest` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11357) Check and port LeaderChangeJobRecoveryTest to new code base if necessary

2019-01-16 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744638#comment-16744638
 ] 

lining commented on FLINK-11357:


Hi,[~till.rohrmann] can assign this to me.

> Check and port LeaderChangeJobRecoveryTest to new code base if necessary
> 
>
> Key: FLINK-11357
> URL: https://issues.apache.org/jira/browse/FLINK-11357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Major
>
> Check and port {{LeaderChangeJobRecoveryTest}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248523663
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
 jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
   }
 
+  @Test
+  def testAddTableFromUpsert(): Unit = {
 
 Review comment:
   Used to check `registerTable()` with `field.key`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248523576
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ##
 @@ -183,6 +183,17 @@ object UpdatingPlanChecker {
   lJoinKeys.zip(rJoinKeys)
 )
   }
+
+case l: DataStreamUpsertToRetraction =>
+  val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex
+.filter(e => l.keyIndexes.contains(e._2))
+.map(_._1)
+  Some(uniqueKeyNames.map(e => (e, e)))
+
+case scan: UpsertStreamScan =>
 
 Review comment:
   The `visite` in `RelVisitor` returns void. We need to return an 
`Option[Seq[(String, String)]]` here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11365) Check and port TaskManagerFailureRecoveryITCase to new code base if necessary

2019-01-16 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-11365:
---

Assignee: boshu Zheng

> Check and port TaskManagerFailureRecoveryITCase to new code base if necessary
> -
>
> Key: FLINK-11365
> URL: https://issues.apache.org/jira/browse/FLINK-11365
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: boshu Zheng
>Priority: Major
>
> Check and port {{TaskManagerFailureRecoveryITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248522927
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.expressions.Cast
+import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+/**
+  * Flink RelNode which matches along with DataStreamSource. Different from 
[[AppendStreamScan]],
+  * [[UpsertStreamScan]] is used to handle upsert streams from source.
+  */
+class UpsertStreamScan(
 
 Review comment:
   Seems a common base class is better. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11361) Check and port RecoveryITCase to new code base if necessary

2019-01-16 Thread Congxian Qiu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu reassigned FLINK-11361:


Assignee: Congxian Qiu

> Check and port RecoveryITCase to new code base if necessary
> ---
>
> Key: FLINK-11361
> URL: https://issues.apache.org/jira/browse/FLINK-11361
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Congxian Qiu
>Priority: Major
>
> Check and port {{RecoveryITCase}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11352) Check and port JobManagerHACheckpointRecoveryITCase to new code base if necessary

2019-01-16 Thread Congxian Qiu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu reassigned FLINK-11352:


Assignee: Congxian Qiu

> Check and port JobManagerHACheckpointRecoveryITCase to new code base if 
> necessary
> -
>
> Key: FLINK-11352
> URL: https://issues.apache.org/jira/browse/FLINK-11352
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Congxian Qiu
>Priority: Major
>
> Check and port {{JobManagerHACheckpointRecoveryITCase}} to new code base if 
> necessary



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11344) Display All Execution Attempt Information on Flink Web Dashboard

2019-01-16 Thread hailong wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744617#comment-16744617
 ] 

hailong wang commented on FLINK-11344:
--

The display list will be long if the job keeps restarting? and the previous 
attempt will be identical to the current attempt if there are errors in the 
job, so we can also locate the failure reasons

> Display All Execution Attempt Information on Flink Web Dashboard
> 
>
> Key: FLINK-11344
> URL: https://issues.apache.org/jira/browse/FLINK-11344
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, only one Execution Attempt of each sub-task is shown in web 
> dashboard, thus, only the succeed Attempt is shown when failover occurs. This 
> may be inconvenient to rapidly locate the failure reasons of failed Attempts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-01-16 Thread leesf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leesf reassigned FLINK-11373:
-

Assignee: leesf

> CliFrontend cuts off reason for error messages
> --
>
> Key: FLINK-11373
> URL: https://issues.apache.org/jira/browse/FLINK-11373
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Maximilian Michels
>Assignee: leesf
>Priority: Minor
>  Labels: starter
>
> The CliFrontend seems to only print the first message in the strace trace and 
> not any of its causes.
> {noformat}
> bin/flink run /non-existing/path
> Could not build the program from JAR file.
> Use the help option (-h or --help) to get help on the command.
> {noformat}
> Notice, the underlying cause of this message is FileNotFoundException.
> Consider changing 
> a) the error message for this particular case 
> b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunhaibotb commented on a change in pull request #7403: [FLINK-11256] Replace the reference of StreamNode object with ID in S…

2019-01-16 Thread GitBox
sunhaibotb commented on a change in pull request #7403: [FLINK-11256] Replace 
the reference of StreamNode object with ID in S…
URL: https://github.com/apache/flink/pull/7403#discussion_r248518981
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ##
 @@ -130,7 +122,7 @@ public boolean equals(Object o) {
 
@Override
public String toString() {
-   return "(" + sourceVertex + " -> " + targetVertex + ", 
typeNumber=" + typeNumber
+   return "(" + sourceId + " -> " + targetId + ", typeNumber=" + 
typeNumber
 
 Review comment:
   I discussed with @sunjincheng121 and decided to add _sourceOperatorName_ and 
_targetOperatorName_.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on issue #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-16 Thread GitBox
zhuzhurk commented on issue #7255: [FLINK-10945] Use InputDependencyConstraint 
to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#issuecomment-455015548
 
 
   Thanks Andrey(@azagrebin) and Till(@tillrohrmann) for the reviewing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial

2019-01-16 Thread GitBox
dianfu commented on issue #7502: [hotfix][docs] Fix documentation of DataStream 
API Tutorial
URL: https://github.com/apache/flink/pull/7502#issuecomment-455011043
 
 
   @sunjincheng121 Thanks a lot for the review, merge and the kind suggestion. 
Will do that next time. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248513031
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   Good catch!
   > First you didn't recursively call the RelTimeIndicatorConverter on the 
upsertToRetraction input
   
   I think you are right. It is better to visit input for 
`LogicalUpsertToRetraction`. I will update the pr according to your suggestion. 
However, it seems impossible for us to add a test for this. The 
`LogicalUpsertToRetraction` is right after the source. There is no case we need 
to materialize time indicators in source. The `visit()` method in  
`RelTimeIndicatorConverter` return scan directly.
   
   > Secondly, we should solve this in some more generic way.
   
   Agree. I suggest to do it in another pr. It is self contained. Also, this pr 
is a bit big. I would rather not to add new feature in it. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r248513031
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   Good catch!
   > First you didn't recursively call the RelTimeIndicatorConverter on the 
upsertToRetraction input
   The `LogicalUpsertToRetraction` is right after the source. There is no case 
we need to materialize time indicators in source. Currently, visit() return 
scan directly in `RelTimeIndicatorConverter`. But I think you are right. It is 
better to visit input for `LogicalUpsertToRetraction`. However, it seems 
impossible for us to add a test for this.
   
   > Secondly, we should solve this in some more generic way.
   Agree. I suggest to do it in another pr. It is self contained. This pr is a 
bit big. I would rather not to add new feature in it. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11159) Allow configuration whether to fall back to savepoints for restore

2019-01-16 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744582#comment-16744582
 ] 

vinoyang commented on FLINK-11159:
--

>From my personal point of view, setting the default to false is a good 
>consideration, which is compatible with the default behavior of the old 
>version, and does not surprise the user, but rather uses it as an optimization 
>option.

> Allow configuration whether to fall back to savepoints for restore
> --
>
> Key: FLINK-11159
> URL: https://issues.apache.org/jira/browse/FLINK-11159
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Major
>
> Ever since FLINK-3397, upon failure, Flink would restart from the latest 
> checkpoint/savepoint which ever is more recent. With the introduction of 
> local recovery and the knowledge that a RocksDB checkpoint restore would just 
> copy the files, it may be time to re-consider / making this configurable:
> In certain situations, it may be faster to restore from the latest checkpoint 
> only (even if there is a more recent savepoint) and reprocess the data 
> between. On the downside, though, that may not be correct because that might 
> break side effects if the savepoint was the latest one, e.g. consider this 
> chain: {{chk1 -> chk2 -> sp … restore chk2 -> …}}. Then all side effects 
> between {{chk2 -> sp}} would be reproduced.
> Making this configurable will allow the user to set whatever he needs / can 
> to get the lowest recovery time in Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] allenxwang commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-16 Thread GitBox
allenxwang commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-455003915
 
 
   This is a very useful feature for us. Any chance this would be merged soon?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial

2019-01-16 Thread GitBox
sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of 
DataStream API Tutorial
URL: https://github.com/apache/flink/pull/7502#issuecomment-455000790
 
 
   Fixed in master: 09eff88a72a46541d624a08fd66bc342099e0c81
   Fixed in release-1.7: e341b3411e1706413b39e4301c97456489005044
   Do not cp the changes into release-1.6, because the doc structure is not 
same. 
   
   BTW. @dianfu I suggest that create the JIRA. before the fix PR next time. 
great thanks. :-)
   
   Bests,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial

2019-01-16 Thread GitBox
asfgit closed pull request #7502: [hotfix][docs] Fix documentation of 
DataStream API Tutorial
URL: https://github.com/apache/flink/pull/7502
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of DataStream API Tutorial

2019-01-16 Thread GitBox
sunjincheng121 commented on issue #7502: [hotfix][docs] Fix documentation of 
DataStream API Tutorial
URL: https://github.com/apache/flink/pull/7502#issuecomment-454996428
 
 
   @dianfu  thanks for the update! 
   LGTM. +1 to merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] KarmaGYZ commented on issue #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new…

2019-01-16 Thread GitBox
KarmaGYZ commented on issue #7517: [FLINK-11359][test] Check and port 
LegacyAvroExternalJarProgramITCase to new…
URL: https://github.com/apache/flink/pull/7517#issuecomment-454986842
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11359) Check and port LegacyAvroExternalJarProgramITCase to new code base if necessary

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11359:
---
Labels: pull-request-available  (was: )

> Check and port LegacyAvroExternalJarProgramITCase to new code base if 
> necessary
> ---
>
> Key: FLINK-11359
> URL: https://issues.apache.org/jira/browse/FLINK-11359
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available
>
> Check and port {{LegacyAvroExternalJarProgramITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ opened a new pull request #7517: [FLINK-11359][test] Check and port LegacyAvroExternalJarProgramITCase to new…

2019-01-16 Thread GitBox
KarmaGYZ opened a new pull request #7517: [FLINK-11359][test] Check and port 
LegacyAvroExternalJarProgramITCase to new…
URL: https://github.com/apache/flink/pull/7517
 
 
   … code base if necessary
   
   
   ## What is the purpose of the change
   
   Check and port LegacyAvroExternalJarProgramITCase to new code base if 
necessary.
   
   ## Brief change log
   
   `LegacyAvroExternalJarProgramITCase` only contains `testExternalProgram` 
which is same with `AvroExternalJarProgramITCase`. So we can simply remove the 
legacy version at all.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`AvroExternalJarProgramITCase#testExternalProgram`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11359) Check and port LegacyAvroExternalJarProgramITCase to new code base if necessary

2019-01-16 Thread Yangze Guo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-11359:
--

Assignee: Yangze Guo

> Check and port LegacyAvroExternalJarProgramITCase to new code base if 
> necessary
> ---
>
> Key: FLINK-11359
> URL: https://issues.apache.org/jira/browse/FLINK-11359
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Yangze Guo
>Priority: Major
>
> Check and port {{LegacyAvroExternalJarProgramITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] AlphaGarden closed pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…

2019-01-16 Thread GitBox
AlphaGarden closed pull request #7312:  [FLINK-11169][runtime] fix the problem 
of not being reloaded for jobmanager's…
URL: https://github.com/apache/flink/pull/7312
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] AlphaGarden commented on issue #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…

2019-01-16 Thread GitBox
AlphaGarden commented on issue #7312:  [FLINK-11169][runtime] fix the problem 
of not being reloaded for jobmanager's…
URL: https://github.com/apache/flink/pull/7312#issuecomment-454978509
 
 
   @aljoscha May I ask a question about the reason why the design and 
implementation of loading logs and stdout files for jobmanager are different 
from the taskmangers' at the first beginning? is there any distinct behavior 
for these two components' loading logs and stdout files? Thanks in advance. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase

2019-01-16 Thread GitBox
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port 
YARNHighAvailabilityITCase to new codebase
URL: https://github.com/apache/flink/pull/7509#discussion_r248478213
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##
 @@ -18,198 +18,205 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-   private static TestingServer zkServer;
-
-   private static ActorSystem actorSystem;
+   @ClassRule
+   public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-   private static final int numberApplicationAttempts = 3;
+   private static final String LOG_DIR = "flink-yarn-tests-ha";
+   private static final Duration TIMEOUT = Duration.ofSeconds(200L);
 
-   @Rule
-   public TemporaryFolder temp = new TemporaryFolder();
+   private static TestingServer zkServer;
+   private static String storageDir;
 
@BeforeClass
-   public static void setup() {
-   actorSystem = AkkaUtils.createDefaultActorSystem();
-
-   try {
-   zkServer = new TestingServer();
-   zkServer.start();
-   } catch (Exception e) {
-   e.printStackTrace();
-   Assert.fail("Could not start ZooKeeper testing 
cluster.");
-   }
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
 
-   YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-   

[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase

2019-01-16 Thread GitBox
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port 
YARNHighAvailabilityITCase to new codebase
URL: https://github.com/apache/flink/pull/7509#discussion_r248476938
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##
 @@ -18,198 +18,205 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-   private static TestingServer zkServer;
-
-   private static ActorSystem actorSystem;
+   @ClassRule
+   public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-   private static final int numberApplicationAttempts = 3;
+   private static final String LOG_DIR = "flink-yarn-tests-ha";
+   private static final Duration TIMEOUT = Duration.ofSeconds(200L);
 
-   @Rule
-   public TemporaryFolder temp = new TemporaryFolder();
+   private static TestingServer zkServer;
+   private static String storageDir;
 
@BeforeClass
-   public static void setup() {
-   actorSystem = AkkaUtils.createDefaultActorSystem();
-
-   try {
-   zkServer = new TestingServer();
-   zkServer.start();
-   } catch (Exception e) {
-   e.printStackTrace();
-   Assert.fail("Could not start ZooKeeper testing 
cluster.");
-   }
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
 
-   YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-   

[GitHub] GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new codebase

2019-01-16 Thread GitBox
GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port 
YARNHighAvailabilityITCase to new codebase
URL: https://github.com/apache/flink/pull/7509#discussion_r248477826
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##
 @@ -18,198 +18,205 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-   private static TestingServer zkServer;
-
-   private static ActorSystem actorSystem;
+   @ClassRule
+   public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-   private static final int numberApplicationAttempts = 3;
+   private static final String LOG_DIR = "flink-yarn-tests-ha";
+   private static final Duration TIMEOUT = Duration.ofSeconds(200L);
 
-   @Rule
-   public TemporaryFolder temp = new TemporaryFolder();
+   private static TestingServer zkServer;
+   private static String storageDir;
 
@BeforeClass
-   public static void setup() {
-   actorSystem = AkkaUtils.createDefaultActorSystem();
-
-   try {
-   zkServer = new TestingServer();
-   zkServer.start();
-   } catch (Exception e) {
-   e.printStackTrace();
-   Assert.fail("Could not start ZooKeeper testing 
cluster.");
-   }
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
 
-   YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-   

[GitHub] GJL commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests

2019-01-16 Thread GitBox
GJL commented on a change in pull request #7512: [FLINK-11294][tests] Remove 
legacy JobInfo usage in valid tests
URL: https://github.com/apache/flink/pull/7512#discussion_r248471274
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 ##
 @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
verifyJobGraphs(jobGraph, 
jobGraphs.recoverJobGraph(jobId));
 
// Update (same ID)
-   jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 
1);
+   jobGraph = createSubmittedJobGraph(jobGraph.getJobId());
jobGraphs.putJobGraph(jobGraph);
 
// Verify updated
 
 Review comment:
   ok understood


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11360) Check and remove LocalFlinkMiniClusterITCase

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11360:
---
Labels: pull-request-available  (was: )

> Check and remove LocalFlinkMiniClusterITCase
> 
>
> Key: FLINK-11360
> URL: https://issues.apache.org/jira/browse/FLINK-11360
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>
> Check tests in {{LocalFlinkMiniClusterITCase}} whether they also apply to the 
> {{MiniCluster}} and port if necessary. Afterwards remove 
> {{LocalFlinkMiniClusterITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #7516: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase

2019-01-16 Thread GitBox
TisonKun opened a new pull request #7516: [FLINK-11360] [test] Check and remove 
LocalFlinkMiniClusterITCase
URL: https://github.com/apache/flink/pull/7516
 
 
   ## What is the purpose of the change
   
   Check tests in LocalFlinkMiniClusterITCase whether they also apply to the 
MiniCluster and port if necessary. Afterwards remove 
LocalFlinkMiniClusterITCase.
   
   ## Brief change log
   
   `LocalFlinkMiniClusterITCase` contains only 
`testLocalFlinkMiniClusterWithMultipleTaskManagers`. We don't cover multi TMs 
case in `MiniClusterITCase`. Thus add it. For implementation, simply test 
positive cases that a job with `numOfTMs * slotPerTM` can run with 
corresponding TMs and slots. negative cases test manually.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   `MiniClusterITCase#runJobWithSingleRpcService` and 
`MiniClusterITCase#runJobWithMultipleRpcServices` are now covered multi TMs 
case.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10822) Configurable MetricQueryService interval

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10822:
---
Labels: pull-request-available  (was: )

> Configurable MetricQueryService interval
> 
>
> Key: FLINK-10822
> URL: https://issues.apache.org/jira/browse/FLINK-10822
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The {{MetricQueryService}} is used for transmitting metrics from TaskManagers 
> to the JobManager, in order to expose them via REST and by extension the 
> WebUI.
> By default the JM will poll metrics at most every 10 seconds. This has an 
> adverse effect on the duration of our end-to-end tests, which for example 
> query metrics via the REST API to determine whether the cluster has started. 
> If during the first poll no TM is available it will take another 10 second 
> for updated information to be available.
> By making this interval configurable we could this reduce the test duration. 
> Additionally this could serve as a switch to disable the 
> {{MetricQueryService}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7459: [FLINK-10822] Make MetricFetcher update interval configurable

2019-01-16 Thread GitBox
asfgit closed pull request #7459:  [FLINK-10822] Make MetricFetcher update 
interval configurable
URL: https://github.com/apache/flink/pull/7459
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10822) Configurable MetricQueryService interval

2019-01-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10822.
---
Resolution: Fixed

Fixed via 474fe8e8ae063e250f7f8f4eddcd799b15f8b69b

> Configurable MetricQueryService interval
> 
>
> Key: FLINK-10822
> URL: https://issues.apache.org/jira/browse/FLINK-10822
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.8.0
>
>
> The {{MetricQueryService}} is used for transmitting metrics from TaskManagers 
> to the JobManager, in order to expose them via REST and by extension the 
> WebUI.
> By default the JM will poll metrics at most every 10 seconds. This has an 
> adverse effect on the duration of our end-to-end tests, which for example 
> query metrics via the REST API to determine whether the cluster has started. 
> If during the first poll no TM is available it will take another 10 second 
> for updated information to be available.
> By making this interval configurable we could this reduce the test duration. 
> Additionally this could serve as a switch to disable the 
> {{MetricQueryService}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable

2019-01-16 Thread GitBox
tillrohrmann commented on issue #7459:  [FLINK-10822] Make MetricFetcher update 
interval configurable
URL: https://github.com/apache/flink/pull/7459#issuecomment-454957193
 
 
   @zentol I like the idea. Shall we do this as a follow up since I've just 
merged the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11351) Port JobManagerCleanupITCase to new code base

2019-01-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-11351:
-

Assignee: Till Rohrmann

> Port JobManagerCleanupITCase to new code base
> -
>
> Key: FLINK-11351
> URL: https://issues.apache.org/jira/browse/FLINK-11351
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>
> Port {{JobManagerCleanupITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11349) Port CoordinatorShutdownTest to new code base

2019-01-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-11349:
-

Assignee: Till Rohrmann

> Port CoordinatorShutdownTest to new code base
> -
>
> Key: FLINK-11349
> URL: https://issues.apache.org/jira/browse/FLINK-11349
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Port {{CoordinatorShutdownTest#testCoordinatorShutsDownOnFailure}} and 
> {{CoordinatorShutdownTest#testCoordinatorShutsDownOnSuccess}} to new code 
> base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken closed pull request #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method

2019-01-16 Thread GitBox
lamber-ken closed pull request #7510: [hotfix][runtime] remove 
RejectedExecutionException code in executeAsyncCallRunnable method
URL: https://github.com/apache/flink/pull/7510
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests

2019-01-16 Thread GitBox
TisonKun commented on issue #7303: [FLINK-10569] [tests] Get rid of Scheduler 
from valid tests
URL: https://github.com/apache/flink/pull/7303#issuecomment-454949528
 
 
   @zentol I see your concern above. To nudge the removal of legacy 
`Scheduler`, since the full usages of it are coupled with `Instance`, I can 
think of 2 approach:
   
   1. Keep this pr and also the thread WIP, notify committer when finish all 
works (Scheduler and Instance).
   2. Separated this removal testcase by testcase, such as "Port 
PointwisePatternTest to new codebase". This follows Till's sort scheme and we 
can complete the whole job by fine-grained parts. In addition, most of these 
test cases, the porting job is fully about replacing `Scheduler`.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests

2019-01-16 Thread GitBox
TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get 
rid of Scheduler from valid tests
URL: https://github.com/apache/flink/pull/7303#discussion_r248454800
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##
 @@ -155,21 +153,12 @@ public void testRestartAutomatically() throws Exception {
public void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
-   Tuple2 executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
-   ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-   Instance instance = executionGraphInstanceTuple.f1;
 
-   // Kill the instance and wait for the job to restart
-   instance.markDead();
-
-   Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
-
-   while (deadline.hasTimeLeft() &&
-   executionGraph.getState() != 
JobStatus.RESTARTING) {
-
-   Thread.sleep(100);
-   }
+   SlotProvider slotProvider = new TestingSlotProvider(ignore -> 
new CompletableFuture<>());
+   ExecutionGraph executionGraph = 
createSimpleExecutionGraph(restartStrategy, slotProvider);
+   assertEquals(JobStatus.CREATED, executionGraph.getState());
 
+   executionGraph.scheduleForExecution();
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
 Review comment:
   This is because the SlotProvider returns non-completed future. And eg is 
configured as not allow queued scheduling. Thus when scheduleForExecutor, it 
causes `IllegalArgumentException("The slot allocation future has not been 
completed yet.")` and then failing and then restarting.
   
   But it is tricky. Maybe I should set allow queued scheduling and fail a 
future to emulate a real failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get rid of Scheduler from valid tests

2019-01-16 Thread GitBox
TisonKun commented on a change in pull request #7303: [FLINK-10569] [tests] Get 
rid of Scheduler from valid tests
URL: https://github.com/apache/flink/pull/7303#discussion_r248449108
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 ##
 @@ -90,12 +90,12 @@ public void testAssignSlotSharingGroup() {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
-   new 
Scheduler(TestingUtils.defaultExecutionContext()));
+   new TestingSlotProvider(ignored -> new 
CompletableFuture<>()));
eg.attachJobGraph(vertices);

// verify that the vertices are all in the same slot 
sharing group
-   SlotSharingGroup group1 = null;
-   SlotSharingGroup group2 = null;
+   SlotSharingGroup group1;
 
 Review comment:
   Gotcha.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7245: [FLINK-11069] [utils] Remove FutureUtil

2019-01-16 Thread GitBox
TisonKun commented on a change in pull request #7245: [FLINK-11069] [utils] 
Remove FutureUtil
URL: https://github.com/apache/flink/pull/7245#discussion_r248448673
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -158,27 +160,31 @@ public void testConcurrentConsumeMultiplePartitions() 
throws Exception {
// Test
try {
// Submit producer tasks
-   List> results = 
Lists.newArrayListWithCapacity(
+   List> results = 
Lists.newArrayListWithCapacity(
parallelism + 1);
 
for (int i = 0; i < parallelism; i++) {
-   
results.add(executor.submit(partitionProducers[i]));
+   results.add(CompletableFuture.supplyAsync(
+   
CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
}
 
// Submit consumer
for (int i = 0; i < parallelism; i++) {
-   results.add(executor.submit(
-   new TestLocalInputChannelConsumer(
-   i,
-   parallelism,
-   numberOfBuffersPerChannel,
-   
networkBuffers.createBufferPool(parallelism, parallelism),
-   partitionManager,
-   new TaskEventDispatcher(),
-   partitionIds)));
+   results.add(CompletableFuture.supplyAsync(
+   CheckedSupplier.unchecked(
+   new 
TestLocalInputChannelConsumer(
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7515: [FLINK-11313][checkpoint] Introduce LZ4 compression for keyed state in full checkpoints and savepoints

2019-01-16 Thread GitBox
zentol commented on a change in pull request #7515: [FLINK-11313][checkpoint] 
Introduce LZ4 compression for keyed state in full checkpoints and savepoints
URL: https://github.com/apache/flink/pull/7515#discussion_r248439222
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -894,12 +898,13 @@ public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}
 
-   public boolean isUseSnapshotCompression() {
 
 Review comment:
   The class is annotated with `@Public`, which means you cannot remove and 
`public` method. This method and it's behavior must be conserved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7506: [FLINK-11347] Optimize the ParquetAvroWriters factory

2019-01-16 Thread GitBox
zentol commented on issue #7506: [FLINK-11347] Optimize the ParquetAvroWriters 
factory
URL: https://github.com/apache/flink/pull/7506#issuecomment-454927640
 
 
   Related test failures:
   ```
   18:30:12.583 [INFO] Running 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase
   18:30:15.884 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time 
elapsed: 3.298 s <<< FAILURE! - in 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase
   18:30:15.884 [ERROR] 
testWriteParquetAvroReflect(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase)
  Time elapsed: 0.371 s  <<< ERROR!
   org.apache.flink.api.common.InvalidProgramException: The implementation of 
the RichSinkFunction is not serializable. The object probably contains or 
references non serializable fields.
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroReflect(ParquetStreamingFileSinkITCase.java:140)
   Caused by: java.io.NotSerializableException: 
org.apache.avro.Schema$RecordSchema
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroReflect(ParquetStreamingFileSinkITCase.java:140)
   
   18:30:15.885 [ERROR] 
testWriteParquetAvroSpecific(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase)
  Time elapsed: 0.085 s  <<< ERROR!
   org.apache.flink.api.common.InvalidProgramException: The implementation of 
the RichSinkFunction is not serializable. The object probably contains or 
references non serializable fields.
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroSpecific(ParquetStreamingFileSinkITCase.java:83)
   Caused by: java.io.NotSerializableException: 
org.apache.avro.Schema$RecordSchema
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroSpecific(ParquetStreamingFileSinkITCase.java:83)
   
   18:30:15.885 [ERROR] 
testWriteParquetAvroGeneric(org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase)
  Time elapsed: 0.013 s  <<< ERROR!
   org.apache.flink.api.common.InvalidProgramException: The implementation of 
the RichSinkFunction is not serializable. The object probably contains or 
references non serializable fields.
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroGeneric(ParquetStreamingFileSinkITCase.java:110)
   Caused by: java.io.NotSerializableException: 
org.apache.avro.Schema$RecordSchema
at 
org.apache.flink.formats.parquet.avro.ParquetStreamingFileSinkITCase.testWriteParquetAvroGeneric(ParquetStreamingFileSinkITCase.java:110)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11196) Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files

2019-01-16 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744345#comment-16744345
 ] 

Steven Zhen Wu commented on FLINK-11196:


[~StephanEwen] can you take a look at the Jira and PR from [~markcho] ?

> Extend S3 EntropyInjector to use key replacement (instead of key removal) 
> when creating checkpoint metadata files
> -
>
> Key: FLINK-11196
> URL: https://issues.apache.org/jira/browse/FLINK-11196
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Mark Cho
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We currently use S3 entropy injection when writing out checkpoint data.
> We also use external checkpoints so that we can resume from a checkpoint 
> metadata file later.
> The current implementation of S3 entropy injector makes it difficult to 
> locate the checkpoint metadata files since in the newer versions of Flink, 
> `state.checkpoints.dir` configuration controls where the metadata and state 
> files are written, instead of having two separate paths (one for metadata, 
> one for state files).
> With entropy injection, we replace the entropy marker in the path specified 
> by `state.checkpoints.dir` with entropy (for state files) or we strip out the 
> marker (for metadata files).
>  
> We need to extend the entropy injection so that we can replace the entropy 
> marker with a predictable path (instead of removing it) so that we can do a 
> prefix query for just the metadata files.
> By not using the entropy key replacement (defaults to empty string), you get 
> the same behavior as it is today (entropy marker removed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11195) Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration

2019-01-16 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744346#comment-16744346
 ] 

Steven Zhen Wu commented on FLINK-11195:


[~StephanEwen] can you take a look at this Jira and PR from [~markcho]?

> Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and 
> Hadoop Configuration
> 
>
> Key: FLINK-11195
> URL: https://issues.apache.org/jira/browse/FLINK-11195
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Mark Cho
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, `createHadoopFileSystem` method does not take any parameters.
> In order to delegate FileSystem creation to Hadoop FileSystem.get method, we 
> need to pass URI and Hadoop Configuration to this abstract method.
> We use a custom version of PrestoS3FileSystem by plugging our 
> FileSystemFactory similar to `flink-filesystems/flink-s3-fs-presto` project. 
> However, we would like to delegate our FS creation to Hadoop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method

2019-01-16 Thread GitBox
zentol commented on issue #7510: [hotfix][runtime] remove 
RejectedExecutionException code in executeAsyncCallRunnable method
URL: https://github.com/apache/flink/pull/7510#issuecomment-454891145
 
 
   why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol removed a comment on issue #7510: [hotfix][runtime] remove RejectedExecutionException code in executeAsyncCallRunnable method

2019-01-16 Thread GitBox
zentol removed a comment on issue #7510: [hotfix][runtime] remove 
RejectedExecutionException code in executeAsyncCallRunnable method
URL: https://github.com/apache/flink/pull/7510#issuecomment-454891145
 
 
   why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-01-16 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-11373:
--

 Summary: CliFrontend cuts off reason for error messages
 Key: FLINK-11373
 URL: https://issues.apache.org/jira/browse/FLINK-11373
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Affects Versions: 1.7.1, 1.6.3, 1.5.6
Reporter: Maximilian Michels


The CliFrontend seems to only print the first message in the strace trace and 
not any of its causes.

{noformat}
bin/flink run /non-existing/path
Could not build the program from JAR file.

Use the help option (-h or --help) to get help on the command.
{noformat}

Notice, the underlying cause of this message is FileNotFoundException.

Consider changing 
a) the error message for this particular case 
b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11274) Scala 2.12 Kryo serialization bug

2019-01-16 Thread Zhenhao Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744281#comment-16744281
 ] 

Zhenhao Li commented on FLINK-11274:


Hi Aljoscha, thanks for checking. I am 100% sure that I resolved it by changing 
the version number from 1.7.1 to 1.7.0.

Since reporting this issue, I have done a major structural refactor of the 
codebase and updated the code for performance optimization. I no longer call 
the filter method on SortedSet anymore.   
I just tried to reproduce the issue. This time everything still worked when I 
changed the version to 1.7.1. 

That was indeed very puzzling to me when it occurred for the first time. I 
forgot to mention that I was running everything inside IntelliJ IDEA. It could 
as well be a bug there. 

Feel free to close this one. 
 

> Scala 2.12 Kryo serialization bug
> -
>
> Key: FLINK-11274
> URL: https://issues.apache.org/jira/browse/FLINK-11274
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.1
> Environment: Flink 1.7.1
> Scala 2.12.8
>Reporter: Zhenhao Li
>Priority: Major
>
> The following code works well for serializing Scala classes, e.g., 
> SortedSet[T], without problem in 1.7.0.
> ```
> env.getConfig.registerTypeWithKryoSerializer(
>   classOf[ClosureSerializer.Closure],
>   classOf[ClosureSerializer]
> )
> ```
> However, in 1.7.1 the following error occurs when checkpointing. 
> ```
> Serialization trace:
> cmp$2 (scala.math.Ordering$$anon$6)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> com.twitter.chill.SortedSetSerializer.read(SortedSetSerializer.scala:38)
>   at 
> com.twitter.chill.SortedSetSerializer.read(SortedSetSerializer.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>   at 
> org.apache.flink.streaming.api.scala.function.StatefulFunction.applyWithState(StatefulFunction.scala:41)
>   at 
> org.apache.flink.streaming.api.scala.function.StatefulFunction.applyWithState$(StatefulFunction.scala:40)
>   at 
> org.apache.flink.streaming.api.scala.KeyedStream$$anon$3.applyWithState(KeyedStream.scala:591)
>   at 
> org.apache.flink.streaming.api.scala.KeyedStream$$anon$3.flatMap(KeyedStream.scala:596)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> io.connecterra.stateful.AggregationSlidingWindowStateUpdater$$$Lambda$506/497325684
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>   ... 24 common frames omitted
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka opened a new pull request #7515: Lz4 compression

2019-01-16 Thread GitBox
Myasuka opened a new pull request #7515: Lz4 compression
URL: https://github.com/apache/flink/pull/7515
 
 
   
   ## What is the purpose of the change
   
   LZ4 is a popular lightweight compression, which has better performance than 
Snappy in many cases, and also [recommended by 
RocksDB](https://github.com/facebook/rocksdb/wiki/Compression#configuration).
   
   Based on this, I introduce LZ4 except for now existing snappy compression 
for keyed state in full checkpoint and savepoints.
   
   ## Brief change log
   
 - Introduce new `CompressionType` interface and move 
`StreamCompressionDecorator` related classes to `flink-core` module.
 - Introduce new enum `CompressionTypes` class and new 
`LZ4StreamCompressionDecorator` class in `flink-runtime` module.
 - Bump `KeyedBackendSerializationProxy` to a newer version for newer 
compression type.
 - Migrated existing tests to use LZ4 compression.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Extended unit tests `SerializationProxiesTest` and 
`StateSnapshotCompressionTest` for newely added compression type.
 - Migrate `EventTimeWindowCheckpointingITCase` and `RescalingITCase` IT 
cases to use LZ4 compression.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes, add lz4-java 
dependency.
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no, but changed the `KeyedBackendSerializationProxy`
 - The runtime per-record code paths (performance sensitive): no, should 
not affect topology task performance.
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11349) Port CoordinatorShutdownTest to new code base

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11349:
---
Labels: pull-request-available  (was: )

> Port CoordinatorShutdownTest to new code base
> -
>
> Key: FLINK-11349
> URL: https://issues.apache.org/jira/browse/FLINK-11349
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
>
> Port {{CoordinatorShutdownTest#testCoordinatorShutsDownOnFailure}} and 
> {{CoordinatorShutdownTest#testCoordinatorShutsDownOnSuccess}} to new code 
> base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #7514: [FLINK-11349][tests] Port CoordinatorShutdownTest to new code base

2019-01-16 Thread GitBox
tillrohrmann opened a new pull request #7514: [FLINK-11349][tests] Port 
CoordinatorShutdownTest to new code base
URL: https://github.com/apache/flink/pull/7514
 
 
   ## What is the purpose of the change
   
   The relevant tests of the CoordinatorShutdownTest have been moved to the
   ExecutionGraphCheckpointCoordinatorTest which executes the same test just
   without spawning an actual cluster.
   
   ## Brief change log
   
   - `testCoordinatorShutsDownOnFailure` is subsumed by 
`ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure`
   - `testCoordinatorShutsDownOnSuccess` is moved to 
`ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFinished
 `
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11372) Incorrect delegation of compatibility checks to new snapshots in CollectionSerializerConfigSnapshot

2019-01-16 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11372:
---

 Summary: Incorrect delegation of compatibility checks to new 
snapshots in CollectionSerializerConfigSnapshot
 Key: FLINK-11372
 URL: https://issues.apache.org/jira/browse/FLINK-11372
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.8.0


In {{CollectionSerializerConfigSnapshot}}:
{code}
@Override
public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(TypeSerializer newSerializer) {
if (newSerializer instanceof ListSerializer) {
ListSerializer newListSerializer = 
(ListSerializer) newSerializer;
ListSerializerSnapshot listSerializerSnapshot = new 
ListSerializerSnapshot<>(newListSerializer);

@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility result = 
(TypeSerializerSchemaCompatibility)

listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
return result;
} else {
return super.resolveSchemaCompatibility(newSerializer);
}
}
{code}

Compatibility check of {{ListSerializer}} is delegated to the new list 
serializer snapshot class, {{ListSerializerSnapshot}}.
However, it is incorrect to let the delegate wrap the new serializer (and 
therefore the new nested element serializer). By doing that, we're essentially 
checking compatibility of the new serializer with itself, whereas it should be 
checking compatibility with the restored serializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7459: [FLINK-10822] Make MetricFetcher update interval configurable

2019-01-16 Thread GitBox
zentol commented on issue #7459:  [FLINK-10822] Make MetricFetcher update 
interval configurable
URL: https://github.com/apache/flink/pull/7459#issuecomment-454854517
 
 
   @tillrohrmann What do you think about disabling the fetcher completely if 
the interval is configured to 0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11350) Remove JobClientActorRecoveryITCase

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11350:
---
Labels: pull-request-available  (was: )

> Remove JobClientActorRecoveryITCase
> ---
>
> Key: FLINK-11350
> URL: https://issues.apache.org/jira/browse/FLINK-11350
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
>
> Remove {{JobClientActorRecoveryITCase}} since it only tests legacy code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #7447: [FLINK-11294] [test] Remove legacy JobInfo usage in valid tests

2019-01-16 Thread GitBox
TisonKun commented on a change in pull request #7447: [FLINK-11294] [test] 
Remove legacy JobInfo usage in valid tests
URL: https://github.com/apache/flink/pull/7447#discussion_r248355439
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 ##
 @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
verifyJobGraphs(jobGraph, 
jobGraphs.recoverJobGraph(jobId));
 
// Update (same ID)
-   jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 
1);
+   jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 
"Updated JobName");
 
 Review comment:
   Also respond at #7512 , if we want to verify update exactly, we should check 
something different between these two JobGraph.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann opened a new pull request #7513: [FLINK-11350][tests] Delete JobClientActorRecoveryITCase

2019-01-16 Thread GitBox
tillrohrmann opened a new pull request #7513: [FLINK-11350][tests] Delete 
JobClientActorRecoveryITCase
URL: https://github.com/apache/flink/pull/7513
 
 
   ## What is the purpose of the change
   
   The JobClientActorRecoveryITCase is no longer needed since it only tests the 
legacy
   code path.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7512: [FLINK-11294][tests] Remove legacy JobInfo usage in valid tests

2019-01-16 Thread GitBox
TisonKun commented on a change in pull request #7512: [FLINK-11294][tests] 
Remove legacy JobInfo usage in valid tests
URL: https://github.com/apache/flink/pull/7512#discussion_r248354857
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 ##
 @@ -114,7 +112,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
verifyJobGraphs(jobGraph, 
jobGraphs.recoverJobGraph(jobId));
 
// Update (same ID)
-   jobGraph = createSubmittedJobGraph(jobGraph.getJobId(), 
1);
+   jobGraph = createSubmittedJobGraph(jobGraph.getJobId());
jobGraphs.putJobGraph(jobGraph);
 
// Verify updated
 
 Review comment:
   `verifyJobGraphs` verifies JobID and JobName, if we don't change both of 
them, we should remove such verification. Otherwise we' better to check update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-16 Thread GitBox
azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of 
StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#issuecomment-454843390
 
 
   Thanks @tillrohrmann ! I pushed the update to address the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-16 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r248348274
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory(
downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
}
 
+   /**
+* Upload all the files to checkpoint fileSystem using specified number 
of threads.
+*
+* @param files The files will be uploaded to checkpoint filesystem.
+* @param numberOfSnapshottingThreads The number of threads used to 
upload the files.
+* @param checkpointStreamFactory The checkpoint streamFactory used to 
create outputstream.
+* @param closeableRegistry
+*
+* @throws Exception Thrown if can not upload all the files.
+*/
+   public static Map 
uploadFilesToCheckpointFs(
+   @Nonnull Map files,
+   int numberOfSnapshottingThreads,
+   CheckpointStreamFactory checkpointStreamFactory,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   Map handles = new HashMap<>();
+
+   ExecutorService executorService = 
createExecutorService(numberOfSnapshottingThreads);
 
 Review comment:
   @klion26 
   The upload and download part are quite independent. The base class could 
contain the executor, number of threads, closable registry and close method (to 
shutdown the executor instead of shutdowning it every time). The  upload and 
download classes can extend it and share the executor. The downloader could be 
then registered with the closable registry. For upload, we can of course 
shutdown it immediately after restore is done. I agree that it would make sense 
to keep threads up and for code reuse as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11370) Check and port ZooKeeperLeaderElectionITCase to new code base if necessary

2019-01-16 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-11370:


Assignee: TisonKun

> Check and port ZooKeeperLeaderElectionITCase to new code base if necessary
> --
>
> Key: FLINK-11370
> URL: https://issues.apache.org/jira/browse/FLINK-11370
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: TisonKun
>Priority: Major
>
> Check and port {{ZooKeeperLeaderElectionITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11371) Close the AvroParquetReader after use

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11371:
---
Labels: pull-request-available  (was: )

> Close the AvroParquetReader after use
> -
>
> Key: FLINK-11371
> URL: https://issues.apache.org/jira/browse/FLINK-11371
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats
>Affects Versions: 1.7.1
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> The AvroParquetReader is not being closed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Fokko opened a new pull request #7511: [FLINK-11371] The AvroParquetReader is not being closed

2019-01-16 Thread GitBox
Fokko opened a new pull request #7511: [FLINK-11371] The AvroParquetReader is 
not being closed
URL: https://github.com/apache/flink/pull/7511
 
 
   https://issues.apache.org/jira/browse/FLINK-11371
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11371) Close the AvroParquetReader after use

2019-01-16 Thread Fokko Driesprong (JIRA)
Fokko Driesprong created FLINK-11371:


 Summary: Close the AvroParquetReader after use
 Key: FLINK-11371
 URL: https://issues.apache.org/jira/browse/FLINK-11371
 Project: Flink
  Issue Type: Improvement
  Components: Formats
Affects Versions: 1.7.1
Reporter: Fokko Driesprong
Assignee: Fokko Driesprong
 Fix For: 1.8.0


The AvroParquetReader is not being closed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11369) Check and port ZooKeeperHAJobManagerTest to new code base if necessary

2019-01-16 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-11369:


Assignee: TisonKun

> Check and port ZooKeeperHAJobManagerTest to new code base if necessary
> --
>
> Key: FLINK-11369
> URL: https://issues.apache.org/jira/browse/FLINK-11369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: TisonKun
>Priority: Major
>
> Check and port {{ZooKeeperHAJobManagerTest}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >