[jira] [Commented] (FLINK-6505) Proactively cleanup local FS for RocksDBKeyedStateBackend on startup

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247141#comment-16247141
 ] 

ASF GitHub Bot commented on FLINK-6505:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4798
  
Hi @StefanRRichter , do you have more feedbacks?


> Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
> 
>
> Key: FLINK-6505
> URL: https://issues.apache.org/jira/browse/FLINK-6505
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> In {{RocksDBKeyedStateBackend}}, the {{instanceBasePath}} is cleared on 
> {{dispose()}}. I think it might make sense to also clear this directory when 
> the backend is created, in case something crashed and the backend never 
> reached {{dispose()}}. At least for previous runs of the same job, we can 
> know what to delete on restart. 
> In general, it is very important for this backend to clean up the local FS, 
> because the local quota might be very limited compared to the DFS. And a node 
> that runs out of local disk space can bring down the whole job, with no way 
> to recover (it might always get rescheduled to that node).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4798: [FLINK-6505] Proactively cleanup local FS for RocksDBKeye...

2017-11-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4798
  
Hi @StefanRRichter , do you have more feedbacks?


---


[jira] [Commented] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247047#comment-16247047
 ] 

ASF GitHub Bot commented on FLINK-7928:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4991

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Notes:  this pull request contains the #4911 since it depends on it.

## What is the purpose of the change

This pull request makes task extendable with  ResourceSpec( #4911), and add 
a two field for calculating the memory needed for an operator to communicating 
with its upstream and downstream.

## Brief change log

  - *Add a extendedResource field for extendable resources in ResourceSpec*
  - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed 
for an operator to communicating with its upstream and downstream*
  - *Add a fromResourceSpec method for transforming ResourceSpec to 
ResourceProfile*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in ResourceProfileTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7928

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4991


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus 
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427

commit d769fe5d0184cd6ac264fd42552d290ae6978fbb
Author: shuai.xus 
Date:   2017-11-08T09:10:01Z

make Resource abstract and add GPUResource FPGAResource

commit f897d1fa1742c8186c93bb60abfd8719f156c7da
Author: shuai.xus 
Date:   2017-11-08T09:20:22Z

enhance test

commit b8e882b9f39f5588338297ce227e200c6527b84b
Author: shuai.xus 
Date:   2017-11-10T02:00:08Z

make create protected

commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6
Author: shuai.xus 
Date:   2017-11-10T03:02:21Z

make constructor public

commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad
Author: shuai.xus 
Date:   2017-10-26T09:38:04Z

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should 
contains:
1. The resource for the operators: the resources in ResourceSpec (please 
refer to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two 
parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

commit 6665d570882efa49e35251092385efc8fb6adeb8
Author: shuai.xus 
Date:   2017-10-27T07:43:25Z

modify compare

commit 739564db031febd5bb029f08df3ced1ef539c7e6
Author: shuai.xus 
Date:   2017-10-30T04:01:42Z

add more denotes

commit c39c3597c1094bb258556d8d6dc12e5305903ea8
Author: shuai.xus 
Date:   2017-11-10T02:55:26Z

rebase with 7878




> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 

[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-11-09 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/4991

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Notes:  this pull request contains the #4911 since it depends on it.

## What is the purpose of the change

This pull request makes task extendable with  ResourceSpec( #4911), and add 
a two field for calculating the memory needed for an operator to communicating 
with its upstream and downstream.

## Brief change log

  - *Add a extendedResource field for extendable resources in ResourceSpec*
  - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed 
for an operator to communicating with its upstream and downstream*
  - *Add a fromResourceSpec method for transforming ResourceSpec to 
ResourceProfile*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in ResourceProfileTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-7928

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4991


commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus 
Date:   2017-10-25T06:56:35Z

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427

commit d769fe5d0184cd6ac264fd42552d290ae6978fbb
Author: shuai.xus 
Date:   2017-11-08T09:10:01Z

make Resource abstract and add GPUResource FPGAResource

commit f897d1fa1742c8186c93bb60abfd8719f156c7da
Author: shuai.xus 
Date:   2017-11-08T09:20:22Z

enhance test

commit b8e882b9f39f5588338297ce227e200c6527b84b
Author: shuai.xus 
Date:   2017-11-10T02:00:08Z

make create protected

commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6
Author: shuai.xus 
Date:   2017-11-10T03:02:21Z

make constructor public

commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad
Author: shuai.xus 
Date:   2017-10-26T09:38:04Z

[FLINK-7928] [runtime] extend the resources in ResourceProfile for 
precisely calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should 
contains:
1. The resource for the operators: the resources in ResourceSpec (please 
refer to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two 
parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

commit 6665d570882efa49e35251092385efc8fb6adeb8
Author: shuai.xus 
Date:   2017-10-27T07:43:25Z

modify compare

commit 739564db031febd5bb029f08df3ced1ef539c7e6
Author: shuai.xus 
Date:   2017-10-30T04:01:42Z

add more denotes

commit c39c3597c1094bb258556d8d6dc12e5305903ea8
Author: shuai.xus 
Date:   2017-11-10T02:55:26Z

rebase with 7878




---


[jira] [Commented] (FLINK-3985) A metric with the name * was already registered

2017-11-09 Thread Su Ralph (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246937#comment-16246937
 ] 

Su Ralph commented on FLINK-3985:
-

 Sorry i'm not runnign tests, it's formal deployments.

> A metric with the name * was already registered
> ---
>
> Key: FLINK-3985
> URL: https://issues.apache.org/jira/browse/FLINK-3985
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>  Labels: test-stability
>
> The YARN tests detected the following failure while running WordCount.
> {code}
> 2016-05-27 21:50:48,230 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Received task CHAIN DataSource (at main(WordCount.java:70) 
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2)
> 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at 

[jira] [Commented] (FLINK-3985) A metric with the name * was already registered

2017-11-09 Thread Su Ralph (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246935#comment-16246935
 ] 

Su Ralph commented on FLINK-3985:
-

i can still see this in 1.2.0. It's marked as fixed but without fix version?

> A metric with the name * was already registered
> ---
>
> Key: FLINK-3985
> URL: https://issues.apache.org/jira/browse/FLINK-3985
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>  Labels: test-stability
>
> The YARN tests detected the following failure while running WordCount.
> {code}
> 2016-05-27 21:50:48,230 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Received task CHAIN DataSource (at main(WordCount.java:70) 
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2)
> 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

[jira] [Updated] (FLINK-8020) Deadlock found in Flink Streaming job

2017-11-09 Thread Weihua Jiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weihua Jiang updated FLINK-8020:

Attachment: jstack53009(2).out

Attach a new jstack file.

After removing our collector.collect() call out of our lock region, the 
deadlock was gone. However, the system is still NOT able to run as we find that 
flink collector hangs. 

See the attached jstack 53009(2).out file, the thread "cache-process0 -> 
async-operator0 -> Sink: hbase-sink0 (1/10)" holds the lock 0x0007b692ad98 
and is still waiting for THIS exact lock again. This makes it hang. 

Our guess is that this is some issue related to async io. After changing our 
async redis lookup to sync redis lookup (by not using async io), the issue is 
gone.

> Deadlock found in Flink Streaming job
> -
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Blocker
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8043) increment job restart metric when fine grained recovery reverted to full job restart

2017-11-09 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-8043:
-

 Summary: increment job restart metric when fine grained recovery 
reverted to full job restart
 Key: FLINK-8043
 URL: https://issues.apache.org/jira/browse/FLINK-8043
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.3.2
Reporter: Steven Zhen Wu


When fine grained recovery failed (e.g. due to not enough taskmager slots when 
replacement taskmanager node didn't come back in time), Flink will revert to 
full job restart. In this case, it should also increment "job restart" metric



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8042) retry individual failover-strategy for some time first before reverting to full job restart

2017-11-09 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-8042:
-

 Summary: retry individual failover-strategy for some time first 
before reverting to full job restart
 Key: FLINK-8042
 URL: https://issues.apache.org/jira/browse/FLINK-8042
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.3.2
Reporter: Steven Zhen Wu


Let's we will a taskmanager node. When Flink tries to attempt fine grained 
recovery and fails replacement taskmanager node didn't come back in time, it 
reverts to full job restart. 

Stephan and Till was suggesting that Flink can/should retry fine grained 
recovery for some time before giving up and reverting full job restart



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8036) Consider using gradle to build Flink

2017-11-09 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-8036.
---
Resolution: Later

Resolving since there is not enough interest.

> Consider using gradle to build Flink
> 
>
> Key: FLINK-8036
> URL: https://issues.apache.org/jira/browse/FLINK-8036
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Here is summary from Lukasz over this thread 
> (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool)
>  w.r.t. performance boost from using gradle:
> Maven performs parallelization at the module level, an entire module needs
> to complete before any dependent modules can start, this means running all
> the checks like findbugs, checkstyle, tests need to finish. Gradle has task
> level parallelism between subprojects which means that as soon as the
> compile and shade steps are done for a project, and dependent subprojects
> can typically start. This means that we get increased parallelism due to
> not needing to wait for findbugs, checkstyle, tests to run. I typically see
> ~20 tasks (at peak) running on my desktop in parallel.
> Flink should consider using gradle - on Linux with SSD, a clean build takes 
> an hour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink

2017-11-09 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246715#comment-16246715
 ] 

Chesnay Schepler commented on FLINK-8036:
-

I can see the benefit, but as it stands I'm against it. As Fabian said, this 
would entail changing many integral parts of the system, not to mention how 
users build custom flink versions.

Furthermore, there are just really trivial workarounds for the described 
use-case:
Step1: compile Flink (without checkstyle etc.)
Step2: invoke checkstyle, findbugs, tests etc. for each submodule in a separate 
process

This is rather trivial to implement and should buy you plenty of time.

> Consider using gradle to build Flink
> 
>
> Key: FLINK-8036
> URL: https://issues.apache.org/jira/browse/FLINK-8036
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Here is summary from Lukasz over this thread 
> (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool)
>  w.r.t. performance boost from using gradle:
> Maven performs parallelization at the module level, an entire module needs
> to complete before any dependent modules can start, this means running all
> the checks like findbugs, checkstyle, tests need to finish. Gradle has task
> level parallelism between subprojects which means that as soon as the
> compile and shade steps are done for a project, and dependent subprojects
> can typically start. This means that we get increased parallelism due to
> not needing to wait for findbugs, checkstyle, tests to run. I typically see
> ~20 tasks (at peak) running on my desktop in parallel.
> Flink should consider using gradle - on Linux with SSD, a clean build takes 
> an hour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm

2017-11-09 Thread Christos Hadjinikolis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christos Hadjinikolis updated FLINK-8041:
-
Description: 
At the moment, the ConnectedComponents algorithm that comes with Flink's native 
Graph API (Gelly) has two issues:
1. It relies on the user to provide correct values for in the vertices DataSet. 
Based on how the algorithm works, these values must be of type Long and be 
unique for every vertex. If the user provides the same values for every vertex 
(e.g. 1) the algorithm still works but as those values are used for the 
identification of the different connected components, one will end up with a 
single connected component and will have no clue as to why this happened. This 
can be easily fixed in two ways: either by checking that the values that appear 
alongside vertex-ids are unique and informing the user if not, or by generating 
those values for every vertex before the algorithm is ran. I have a running 
implementation of the second way and I really think it is an appropriate 
solution to this problem.
2. Once the connected components are identified, one has to apply additional 
transformations and actions to find out which is the biggest or the order of 
the connected components in terms of their size. Alternatively, the algorithm 
can be implemented so that numerical ids that are given to each component 
reflect their ranking when ordered based on size, e.g. connected component 1 
will be the biggest, connected component 2 should be the second biggest and so 
on.  I have also solved this and I think it would make a nice addition. 

  was:
At the moment, the ConnectedComponents algorithm that comes with Flink's native 
Graph API (Gelly) has two issues:
1. It relies on the user to provide correct values for in the vertices DataSet. 
Based on how the algorithm works, these values must be of type Long and be 
unique for every vertex. If the user provides the same values for every vertex 
(e.g. 1) the algorithm still works but as those values are used for the 
identification of the different connected components, one will end up with a 
single connected component and will have no clue as to why this happened. This 
can be easily fixed in two ways: either by checking that the values that appear 
alongside vertex-ids are unique and informing the user if not, or by generating 
those values for every vertex before the algorithm is ran. I have a running 
implementation of the second way and I really think it is an appropriate 
solution to this problem.
2. Once the connected components are identified, one has to apply additional 
transformations and actions to find out which is the biggest or the order of 
the connected components in terms of their size. Alternatively, the algorithm 
can be implemented so that numerical ids that are given to each component 
reflect their ranking when ordered based on size, e.g. connected component 1 
will be the biggest, connected component 2 should be the second biggest and so 
on.  


> Recommended Improvements for Gelly's Connected Components Algorithm
> ---
>
> Key: FLINK-8041
> URL: https://issues.apache.org/jira/browse/FLINK-8041
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.3.2
> Environment: Linux, IntelliJ IDEA
>Reporter: Christos Hadjinikolis
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> At the moment, the ConnectedComponents algorithm that comes with Flink's 
> native Graph API (Gelly) has two issues:
> 1. It relies on the user to provide correct values for in the vertices 
> DataSet. Based on how the algorithm works, these values must be of type Long 
> and be unique for every vertex. If the user provides the same values for 
> every vertex (e.g. 1) the algorithm still works but as those values are used 
> for the identification of the different connected components, one will end up 
> with a single connected component and will have no clue as to why this 
> happened. This can be easily fixed in two ways: either by checking that the 
> values that appear alongside vertex-ids are unique and informing the user if 
> not, or by generating those values for every vertex before the algorithm is 
> ran. I have a running implementation of the second way and I really think it 
> is an appropriate solution to this problem.
> 2. Once the connected components are identified, one has to apply additional 
> transformations and actions to find out which is the biggest or the order of 
> the connected components in terms of their size. Alternatively, the algorithm 
> can be implemented so that numerical ids that are given to each component 
> reflect their ranking when ordered based on size, e.g. 

[jira] [Created] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm

2017-11-09 Thread Christos Hadjinikolis (JIRA)
Christos Hadjinikolis created FLINK-8041:


 Summary: Recommended Improvements for Gelly's Connected Components 
Algorithm
 Key: FLINK-8041
 URL: https://issues.apache.org/jira/browse/FLINK-8041
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 1.3.2
 Environment: Linux, IntelliJ IDEA
Reporter: Christos Hadjinikolis
Priority: Minor
 Fix For: 1.4.0


At the moment, the ConnectedComponents algorithm that comes with Flink's native 
Graph API (Gelly) has two issues:
1. It relies on the user to provide correct values for in the vertices DataSet. 
Based on how the algorithm works, these values must be of type Long and be 
unique for every vertex. If the user provides the same values for every vertex 
(e.g. 1) the algorithm still works but as those values are used for the 
identification of the different connected components, one will end up with a 
single connected component and will have no clue as to why this happened. This 
can be easily fixed in two ways: either by checking that the values that appear 
alongside vertex-ids are unique and informing the user if not, or by generating 
those values for every vertex before the algorithm is ran. I have a running 
implementation of the second way and I really think it is an appropriate 
solution to this problem.
2. Once the connected components are identified, one has to apply additional 
transformations and actions to find out which is the biggest or the order of 
the connected components in terms of their size. Alternatively, the algorithm 
can be implemented so that numerical ids that are given to each component 
reflect their ranking when ordered based on size, e.g. connected component 1 
will be the biggest, connected component 2 should be the second biggest and so 
on.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry

2017-11-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246487#comment-16246487
 ] 

Greg Hogan commented on FLINK-7919:
---

[~fhueske] would an alternative be to use inner join semantics (dropping the 
unmatched element) and potentially support outer joins in the future?

> Join with Solution Set fails with NPE if Solution Set has no entry
> --
>
> Key: FLINK-7919
> URL: https://issues.apache.org/jira/browse/FLINK-7919
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>
> A job with a delta iteration fails hard with a NPE in the solution set join, 
> if the solution set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet> values = env.fromElements(
>   Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration, Tuple2> di = values
>   .iterateDelta(values, 5,0);
> DataSet> loop = di.getWorkset()
>   .map(new MapFunction, Tuple2>() {
> @Override
> public Tuple2 map(Tuple2 value) throws 
> Exception {
>   // modifying the key to join on a non existing solution set key 
>   return Tuple2.of(value.f0 + 1, 1);
> }
>   })
>   .join(di.getSolutionSet()).where(0).equalTo(0)
>   .with(new JoinFunction, Tuple2, 
> Tuple2>() {
> @Override
> public Tuple2 join(
>   Tuple2 first, 
>   Tuple2 second) throws Exception {
>   
>   return Tuple2.of(first.f0, first.f1 + second.f1);
> }
>   });
> DataSet> result = di.closeWith(loop, loop);
> result.print();
> {code}
> It doesn't matter whether the solution set is managed or not. 
> The problem is cause because the solution set hash table prober returns a 
> {{null}} value if the solution set does not contain a value for the probe 
> side key. 
> The join operator does not check if the return value is {{null}} or not but 
> immediately tries to create a copy using a {{TypeSerializer}}. This copy 
> fails with a NPE.
> I propose to check for {{null}} and call the join function with {{null}} on 
> the solution set side. This gives OUTER JOIN semantics for join.
> Since the code was previously failing with a NPE, it is safe to forward the 
> {{null}} into the {{JoinFunction}}. 
> However, users must be aware that the solution set value may be {{null}} and 
> we need to update the documentation (JavaDocs + website) to describe the 
> behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8014) Add Kafka010JsonTableSink

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246472#comment-16246472
 ] 

ASF GitHub Bot commented on FLINK-8014:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4990

[FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSink and documentation

## What is the purpose of the change

- Adds a TableSink to write JSON-encoded rows to Kafka 0.10 topics
- Adds documentation for KafkaJsonTableSinks

## Brief change log

* Add Kafka010JsonTableSink
* Enable flush on checkpoint to ensure at-least-once guarantees
* Refactor tests for KafkaJsonTableSInks
* Add documentation for KafkaJsonTableSinks

## Verifying this change

* Tests have been added.
* The sink uses a regular Kafka 0.10 producer
* The SerializationSchema is separately tested

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **YES**
  - If yes, how is the feature documented? Documentation has been added to 
website


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableKafka10Sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4990


commit 937637b340ca02be770f040d94396b18bc9619d7
Author: Fabian Hueske 
Date:   2017-11-07T16:59:43Z

[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.

commit b0fcc04229aac97a938fffdcdcad9b388cc37d72
Author: Fabian Hueske 
Date:   2017-11-09T14:07:17Z

[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.




> Add Kafka010JsonTableSink
> -
>
> Key: FLINK-8014
> URL: https://issues.apache.org/jira/browse/FLINK-8014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.4.0
>
>
> Offer a TableSource for JSON-encoded Kafka 0.10 topics but no TableSink.
> Since, the required base classes are already there, a 
> {{Kafka010JsonTableSink}} can be easily added. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4990: [FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSin...

2017-11-09 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4990

[FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSink and documentation

## What is the purpose of the change

- Adds a TableSink to write JSON-encoded rows to Kafka 0.10 topics
- Adds documentation for KafkaJsonTableSinks

## Brief change log

* Add Kafka010JsonTableSink
* Enable flush on checkpoint to ensure at-least-once guarantees
* Refactor tests for KafkaJsonTableSInks
* Add documentation for KafkaJsonTableSinks

## Verifying this change

* Tests have been added.
* The sink uses a regular Kafka 0.10 producer
* The SerializationSchema is separately tested

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **YES**
  - If yes, how is the feature documented? Documentation has been added to 
website


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableKafka10Sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4990


commit 937637b340ca02be770f040d94396b18bc9619d7
Author: Fabian Hueske 
Date:   2017-11-07T16:59:43Z

[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.

commit b0fcc04229aac97a938fffdcdcad9b388cc37d72
Author: Fabian Hueske 
Date:   2017-11-09T14:07:17Z

[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.




---


[jira] [Commented] (FLINK-7977) bump version of compatibility check for Flink 1.4

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246436#comment-16246436
 ] 

ASF GitHub Bot commented on FLINK-7977:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4945
  
@greghogan That's a great point.


> bump version of compatibility check for Flink 1.4
> -
>
> Key: FLINK-7977
> URL: https://issues.apache.org/jira/browse/FLINK-7977
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 
> should check compatibility with 1.2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4945: [FLINK-7977][build] bump version of compatibility check f...

2017-11-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4945
  
@greghogan That's a great point.


---


[jira] [Commented] (FLINK-2973) Add flink-benchmark with compliant licenses again

2017-11-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246432#comment-16246432
 ] 

Greg Hogan commented on FLINK-2973:
---

[~fhueske] it looks to be GPLv2 (with classpath exception): 
http://hg.openjdk.java.net/jdk9/jdk9/file/a08cbfc0e4ec/LICENSE

> Add flink-benchmark with compliant licenses again
> -
>
> Key: FLINK-2973
> URL: https://issues.apache.org/jira/browse/FLINK-2973
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.0.0
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Minor
> Fix For: 1.0.0
>
>
> We recently created the Maven module {{flink-benchmark}} for micro-benchmarks 
> and ported most of the existing micro-benchmarks to the Java benchmarking 
> framework JMH. However, JMH is part of OpenJDK and under GPL license which is 
> not compatible with the AL2.
> Consequently, we need to remove this dependency and either revert the porting 
> commits or port the benchmarks to another benchmarking framework. An 
> alternative could be [Google's Caliper|https://github.com/google/caliper] 
> library which is under AL2.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4976: [FLINK-8017] Fix High availability cluster-id key ...

2017-11-09 Thread dkelley-accretive
Github user dkelley-accretive closed the pull request at:

https://github.com/apache/flink/pull/4976


---


[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246245#comment-16246245
 ] 

ASF GitHub Bot commented on FLINK-8017:
---

Github user dkelley-accretive closed the pull request at:

https://github.com/apache/flink/pull/4976


> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8020) Deadlock found in Flink Streaming job

2017-11-09 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246081#comment-16246081
 ] 

Kostas Kloudas commented on FLINK-8020:
---

[~greghogan] I suppose you can close it and [~whjiang] can re-open it in case 
it is a valid JIRA.

> Deadlock found in Flink Streaming job
> -
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Blocker
> Attachments: jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-09 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4943


---


[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246015#comment-16246015
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/4943


> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246007#comment-16246007
 ] 

ASF GitHub Bot commented on FLINK-7419:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4981#discussion_r150020494
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -185,17 +185,6 @@ under the License.


${project.basedir}/target/dependency-reduced-pom.xml

true
--- End diff --

We can probably remove the complete shading entry in that pom. Seem not to 
do anything anyways. Promoting transitive dependencies when nothing except 
`force-shading` is shaded into the jar should be a no-op. (theoretically, one 
never knows with the shade plugin ;-) )


> Shade jackson dependency in flink-avro
> --
>
> Key: FLINK-7419
> URL: https://issues.apache.org/jira/browse/FLINK-7419
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Avro uses {{org.codehouse.jackson}} which also exists in multiple 
> incompatible versions. We should shade it to 
> {{org.apache.flink.shaded.avro.org.codehouse.jackson}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4981: [FLINK-7419][build][avro] Relocate jackson in flin...

2017-11-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4981#discussion_r150020494
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -185,17 +185,6 @@ under the License.


${project.basedir}/target/dependency-reduced-pom.xml

true
--- End diff --

We can probably remove the complete shading entry in that pom. Seem not to 
do anything anyways. Promoting transitive dependencies when nothing except 
`force-shading` is shaded into the jar should be a no-op. (theoretically, one 
never knows with the shade plugin ;-) )


---


[jira] [Commented] (FLINK-8011) Set dist flink-python dependency to provided

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245997#comment-16245997
 ] 

ASF GitHub Bot commented on FLINK-8011:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4973
  
Yes, that is how it should be.

+1


> Set dist flink-python dependency to provided
> 
>
> Key: FLINK-8011
> URL: https://issues.apache.org/jira/browse/FLINK-8011
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> We can simplify the flink-dist pom by setting the flink-python dependency to 
> provided, which allows us to remove an exclusion from the shade plugin 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4973: [FLINK-8011][dist] Set flink-python to provided

2017-11-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4973
  
Yes, that is how it should be.

+1


---


[jira] [Commented] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245993#comment-16245993
 ] 

ASF GitHub Bot commented on FLINK-8006:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4968
  
LGTM

+1 to merge


> flink-daemon.sh: line 103: binary operator expected
> ---
>
> Key: FLINK-8006
> URL: https://issues.apache.org/jira/browse/FLINK-8006
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.3.2
> Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) 
> i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux
>Reporter: Alejandro 
>  Labels: easyfix, newbie
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> When executing `./bin/start-local.sh` I get
> flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect
> flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary 
> operator expected
> I solved the problem replacing $pid by "$pid" in lines 79 and 103.
> Should I make a PR to the repo?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4968: [FLINK-8006] [Startup Shell Scripts] Enclosing $pid in qu...

2017-11-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4968
  
LGTM

+1 to merge


---


[jira] [Commented] (FLINK-7886) Enable dependency convergence for flink-core

2017-11-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245925#comment-16245925
 ] 

Aljoscha Krettek commented on FLINK-7886:
-

Implemented on release-1.4 in
005a871771ce73bef9c78ee04a61817fa9a31e99

> Enable dependency convergence for flink-core
> 
>
> Key: FLINK-7886
> URL: https://issues.apache.org/jira/browse/FLINK-7886
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245879#comment-16245879
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4980
  
I agree! +1 to merge as soon as Travis gives us the green light.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4980
  
I agree! +1 to merge as soon as Travis gives us the green light.


---


[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
thanks, I think this is excellent now. 👌 

I'll merge as soon as travis is green.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245867#comment-16245867
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
thanks, I think this is excellent now.  

I'll merge as soon as travis is green.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245825#comment-16245825
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149993559
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

yes, a latch that was already triggered will simply return immediately, no 
need for an additional check



> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149993559
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

yes, a latch that was already triggered will simply return immediately, no 
need for an additional check



---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245808#comment-16245808
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245810#comment-16245810
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4777


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4777


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245781#comment-16245781
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149985097
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

I think it doesn't matter because the latch already checks for the flag.
```
public void await() throws InterruptedException {
synchronized (lock) {
while (!triggered) {
lock.wait();
}
}
}
```


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> 

[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245767#comment-16245767
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149980302
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

Can't we enforce jackson 2.6 via dependency management? I think this would 
be cleaner than excluding the dependencies here and assume that 
`aws-java-sdk-s3` pulls in the missing dependencies.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149980302
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

Can't we enforce jackson 2.6 via dependency management? I think this would 
be cleaner than excluding the dependencies here and assume that 
`aws-java-sdk-s3` pulls in the missing dependencies.


---


[jira] [Assigned] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8040:
---

Assignee: Stefan Richter

> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>Assignee: Stefan Richter
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> {noformat}
> *How to reproduce*
> Run the test with a high number of iterations.
> To further provoke a failure, add {{Thread.sleep(100)}} before the following 
> line
> {code}
> ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
> {code}
> This will more likely result in {{closed}} being {{true}} at the time the 2nd 
> lease is acquired and an exception is thrown:
> {code}
>   if (closed) {
>   throw new IOException("Resource guard was already closed.");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149985097
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

I think it doesn't matter because the latch already checks for the flag.
```
public void await() throws InterruptedException {
synchronized (lock) {
while (!triggered) {
lock.wait();
}
}
}
```


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245776#comment-16245776
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149984491
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Sorry, I was just looking on the IDE and missed the lines. This line should 
be before every time you call `await` on the `latch`.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149984491
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Sorry, I was just looking on the IDE and missed the lines. This line should 
be before every time you call `await` on the `latch`.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245773#comment-16245773
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149983878
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Why is that? I think at this point the latch might not get triggered at all.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149983878
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Why is that? I think at this point the latch might not get triggered at all.


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245769#comment-16245769
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983025
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

It might be necessary to add these dependencies then explicitly here.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> 

[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245768#comment-16245768
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983029
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

Shouldn't we override the avro version instead of excluding it?


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983029
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

Shouldn't we override the avro version instead of excluding it?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983025
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

It might be necessary to add these dependencies then explicitly here.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245765#comment-16245765
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149982812
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

for all latches, it should also have:`if (!latch.isTriggered()) { 
latch.await() }`


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149982812
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

for all latches, it should also have:`if (!latch.isTriggered()) { 
latch.await() }`


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245753#comment-16245753
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!

Could you please close if GH doesn't auto-close?


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7886) Enable dependency convergence for flink-core

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7886.
---
Resolution: Fixed

Implemented on master in
bdbca37b01254aef9ddd6943d34dab1838c2a9f1

> Enable dependency convergence for flink-core
> 
>
> Key: FLINK-7886
> URL: https://issues.apache.org/jira/browse/FLINK-7886
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!

Could you please close if GH doesn't auto-close?


---


[GitHub] flink issue #4989: [Flink 7003] [Table API & SQL] use PEEK_FIELDS_NO_EXPAND ...

2017-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4989
  
Thanks for the PR @suez1224.
I will merge it.

Thanks, Fabian


---


[jira] [Commented] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245714#comment-16245714
 ] 

ASF GitHub Bot commented on FLINK-8006:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4968
  
+1


> flink-daemon.sh: line 103: binary operator expected
> ---
>
> Key: FLINK-8006
> URL: https://issues.apache.org/jira/browse/FLINK-8006
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.3.2
> Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) 
> i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux
>Reporter: Alejandro 
>  Labels: easyfix, newbie
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> When executing `./bin/start-local.sh` I get
> flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect
> flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary 
> operator expected
> I solved the problem replacing $pid by "$pid" in lines 79 and 103.
> Should I make a PR to the repo?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4968: [FLINK-8006] [Startup Shell Scripts] Enclosing $pid in qu...

2017-11-09 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4968
  
+1


---


[jira] [Commented] (FLINK-7679) Upgrade maven enforcer plugin to 3.0.0-M1

2017-11-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245711#comment-16245711
 ] 

Greg Hogan commented on FLINK-7679:
---

So it looks like Scala 2.12+ is required for Java 9.

> Upgrade maven enforcer plugin to 3.0.0-M1
> -
>
> Key: FLINK-7679
> URL: https://issues.apache.org/jira/browse/FLINK-7679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou UTC+8
>
> I got the following build error against Java 9:
> {code}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-maven) 
> on project flink-parent: Execution enforce-maven of goal 
> org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: An API 
> incompatibility was encountered while executing 
> org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce: 
> java.lang.ExceptionInInitializerError: null
> [ERROR] -
> [ERROR] realm =plugin>org.apache.maven.plugins:maven-enforcer-plugin:1.4.1
> [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
> [ERROR] urls[0] = 
> file:/home/hbase/.m2/repository/org/apache/maven/plugins/maven-enforcer-plugin/1.4.1/maven-enforcer-plugin-1.4.1.jar
> {code}
> Upgrading maven enforcer plugin to 3.0.0-M1 would get over the above error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245681#comment-16245681
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Yes, but I think this is making an assumption about the internal 
implementation. If someone changes that the test could break/not test the right 
thing anymore.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Yes, but I think this is making an assumption about the internal 
implementation. If someone changes that the test could break/not test the right 
thing anymore.


---


[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Created] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8040:
---

 Summary: Test instability ResourceGuard#testCloseBlockIfAcquired
 Key: FLINK-8040
 URL: https://issues.apache.org/jira/browse/FLINK-8040
 Project: Flink
  Issue Type: Bug
  Components: Core, Tests
Affects Versions: 1.4.0, 1.5.0
Reporter: Gary Yao
 Fix For: 1.4.0, 1.5.0


Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke the failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke the failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245638#comment-16245638
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
https://issues.apache.org/jira/browse/FLINK-7765 - this is a parent issue 
enclosing fixing other modules. Split between other subtasks is very, very 
arbitrary, but I guess next milestone would be to enable convergence in 
`flink-runtime`, while tackling the issue of `flink-shaded-hadoop`.


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245640#comment-16245640
 ] 

Robert Metzger commented on FLINK-6022:
---

This whole JIRA is only about the case when people are using a 
DataStream. 
I agree its not a good idea, but people are doing it. As stated in the JIRA 
description, the schema of all records in a stream need to be the same for this 
to work.
I believe we should keep this JIRA open, because the problem has not been 
addressed.

> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
https://issues.apache.org/jira/browse/FLINK-7765 - this is a parent issue 
enclosing fixing other modules. Split between other subtasks is very, very 
arbitrary, but I guess next milestone would be to enable convergence in 
`flink-runtime`, while tackling the issue of `flink-shaded-hadoop`.


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245631#comment-16245631
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'll merge, yes, what are the follow-up issues you created?


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245630#comment-16245630
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
There is only one thread dispatching the calls:
```
executor = Executors.newSingleThreadExecutor(
new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
this.asyncCallDispatcher = executor;
```

The tasks cannot overtake each other. I could make the test more strict and 
wait additionally on `triggerLatch` in case somebody decides to have multiple 
threads.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
There is only one thread dispatching the calls:
```
executor = Executors.newSingleThreadExecutor(
new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
this.asyncCallDispatcher = executor;
```

The tasks cannot overtake each other. I could make the test more strict and 
wait additionally on `triggerLatch` in case somebody decides to have multiple 
threads.


---


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'll merge, yes, what are the follow-up issues you created?


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245623#comment-16245623
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
I think waiting on the stop latch might not be enough (in 100 % of cases) 
because the other two calls are also asynchronous.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
I think waiting on the stop latch might not be enough (in 100 % of cases) 
because the other two calls are also asynchronous.


---


[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245603#comment-16245603
 ] 

ASF GitHub Bot commented on FLINK-8017:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Could you please close this if it doesn't auto-close?


> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8017.
---
Resolution: Fixed

Fixed on release-1.4 in
02a19a14fad1ef928038f4971bdcacf4d0642d88

Fixed on master in
624df0120a962fe93d31544d7b13637121196197

> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Could you please close this if it doesn't auto-close?


---


[jira] [Updated] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8017:

Fix Version/s: 1.4.0

> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245600#comment-16245600
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
I addressed the comments. Let's wait for Travis and let me know if 
something else needs to be changed. 

@aljoscha  @kl0u 


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
I addressed the comments. Let's wait for Travis and let me know if 
something else needs to be changed. 

@aljoscha  @kl0u 


---


[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245599#comment-16245599
 ] 

ASF GitHub Bot commented on FLINK-8017:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Thanks for spotting and fixing this!  

I'm merging.


> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Thanks for spotting and fixing this! 👍 

I'm merging.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245597#comment-16245597
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149955008
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure anymore but I decided to add it again.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149955008
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure anymore but I decided to add it again.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245596#comment-16245596
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954849
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure but I decided to add it again.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245592#comment-16245592
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954576
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245593#comment-16245593
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

Indented.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954849
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure but I decided to add it again.


---


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

Indented.


---


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954576
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());
+
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   

[jira] [Assigned] (FLINK-8021) End-to-end tests may not shutdown cluster on failure

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-8021:
---

Assignee: Aljoscha Krettek

> End-to-end tests may not shutdown cluster on failure
> 
>
> Key: FLINK-8021
> URL: https://issues.apache.org/jira/browse/FLINK-8021
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>
> In this job https://travis-ci.org/zentol/flink/jobs/298656917 the kafka E2E 
> test failed straight away due to a missing class. The subsequent tests failed 
> since they cannot allocate the JM port.
> It is thus likely that the E2E tests do not shutdown the cluster in all 
> failure cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245581#comment-16245581
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
All is green, could we merge this? This PR has a quite large silent 
conflict chance during rebasing :( (whenever someone changes any dependency on 
master) 


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
All is green, could we merge this? This PR has a quite large silent 
conflict chance during rebasing :( (whenever someone changes any dependency on 
master) 


---


[jira] [Comment Edited] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577
 ] 

Stephan Ewen edited comment on FLINK-6022 at 11/9/17 12:48 PM:
---

We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways?

I would honestly close this, because I assume the intention was around using 
Avro's specific record mechanism and the "generic" mechanism (where we use the 
ReflectDatumReader/Writer). Both should work well now.


was (Author: stephanewen):
We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways.

> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577
 ] 

Stephan Ewen commented on FLINK-6022:
-

We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways.

> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >