[jira] [Commented] (FLINK-6505) Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
[ https://issues.apache.org/jira/browse/FLINK-6505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247141#comment-16247141 ] ASF GitHub Bot commented on FLINK-6505: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4798 Hi @StefanRRichter , do you have more feedbacks? > Proactively cleanup local FS for RocksDBKeyedStateBackend on startup > > > Key: FLINK-6505 > URL: https://issues.apache.org/jira/browse/FLINK-6505 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Bowen Li > Fix For: 1.4.0 > > > In {{RocksDBKeyedStateBackend}}, the {{instanceBasePath}} is cleared on > {{dispose()}}. I think it might make sense to also clear this directory when > the backend is created, in case something crashed and the backend never > reached {{dispose()}}. At least for previous runs of the same job, we can > know what to delete on restart. > In general, it is very important for this backend to clean up the local FS, > because the local quota might be very limited compared to the DFS. And a node > that runs out of local disk space can bring down the whole job, with no way > to recover (it might always get rescheduled to that node). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4798: [FLINK-6505] Proactively cleanup local FS for RocksDBKeye...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4798 Hi @StefanRRichter , do you have more feedbacks? ---
[jira] [Commented] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247047#comment-16247047 ] ASF GitHub Bot commented on FLINK-7928: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4991 [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Notes: this pull request contains the #4911 since it depends on it. ## What is the purpose of the change This pull request makes task extendable with ResourceSpec( #4911), and add a two field for calculating the memory needed for an operator to communicating with its upstream and downstream. ## Brief change log - *Add a extendedResource field for extendable resources in ResourceSpec* - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed for an operator to communicating with its upstream and downstream* - *Add a fromResourceSpec method for transforming ResourceSpec to ResourceProfile* ## Verifying this change This change added tests and can be verified as follows: - *Added test in ResourceProfileTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7928 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4991.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4991 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xusDate: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 commit d769fe5d0184cd6ac264fd42552d290ae6978fbb Author: shuai.xus Date: 2017-11-08T09:10:01Z make Resource abstract and add GPUResource FPGAResource commit f897d1fa1742c8186c93bb60abfd8719f156c7da Author: shuai.xus Date: 2017-11-08T09:20:22Z enhance test commit b8e882b9f39f5588338297ce227e200c6527b84b Author: shuai.xus Date: 2017-11-10T02:00:08Z make create protected commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6 Author: shuai.xus Date: 2017-11-10T03:02:21Z make constructor public commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad Author: shuai.xus Date: 2017-10-26T09:38:04Z [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Summary: ResourceProfile denotes the resource requirements of a task. It should contains: 1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878) 2. The resource for the task to communicate with its upstreams. 3. The resource for the task to communicate with its downstreams. Now the ResourceProfile only contains the first part. Adding the last two parts. Test Plan: UnitTests Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D330364 commit 6665d570882efa49e35251092385efc8fb6adeb8 Author: shuai.xus Date: 2017-10-27T07:43:25Z modify compare commit 739564db031febd5bb029f08df3ced1ef539c7e6 Author: shuai.xus Date: 2017-10-30T04:01:42Z add more denotes commit c39c3597c1094bb258556d8d6dc12e5305903ea8 Author: shuai.xus Date: 2017-11-10T02:55:26Z rebase with 7878 > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager >
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4991 [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Notes: this pull request contains the #4911 since it depends on it. ## What is the purpose of the change This pull request makes task extendable with ResourceSpec( #4911), and add a two field for calculating the memory needed for an operator to communicating with its upstream and downstream. ## Brief change log - *Add a extendedResource field for extendable resources in ResourceSpec* - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed for an operator to communicating with its upstream and downstream* - *Add a fromResourceSpec method for transforming ResourceSpec to ResourceProfile* ## Verifying this change This change added tests and can be verified as follows: - *Added test in ResourceProfileTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7928 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4991.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4991 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xusDate: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 commit d769fe5d0184cd6ac264fd42552d290ae6978fbb Author: shuai.xus Date: 2017-11-08T09:10:01Z make Resource abstract and add GPUResource FPGAResource commit f897d1fa1742c8186c93bb60abfd8719f156c7da Author: shuai.xus Date: 2017-11-08T09:20:22Z enhance test commit b8e882b9f39f5588338297ce227e200c6527b84b Author: shuai.xus Date: 2017-11-10T02:00:08Z make create protected commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6 Author: shuai.xus Date: 2017-11-10T03:02:21Z make constructor public commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad Author: shuai.xus Date: 2017-10-26T09:38:04Z [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Summary: ResourceProfile denotes the resource requirements of a task. It should contains: 1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878) 2. The resource for the task to communicate with its upstreams. 3. The resource for the task to communicate with its downstreams. Now the ResourceProfile only contains the first part. Adding the last two parts. Test Plan: UnitTests Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D330364 commit 6665d570882efa49e35251092385efc8fb6adeb8 Author: shuai.xus Date: 2017-10-27T07:43:25Z modify compare commit 739564db031febd5bb029f08df3ced1ef539c7e6 Author: shuai.xus Date: 2017-10-30T04:01:42Z add more denotes commit c39c3597c1094bb258556d8d6dc12e5305903ea8 Author: shuai.xus Date: 2017-11-10T02:55:26Z rebase with 7878 ---
[jira] [Commented] (FLINK-3985) A metric with the name * was already registered
[ https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246937#comment-16246937 ] Su Ralph commented on FLINK-3985: - Sorry i'm not runnign tests, it's formal deployments. > A metric with the name * was already registered > --- > > Key: FLINK-3985 > URL: https://issues.apache.org/jira/browse/FLINK-3985 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: test-stability > > The YARN tests detected the following failure while running WordCount. > {code} > 2016-05-27 21:50:48,230 INFO org.apache.flink.yarn.YarnTaskManager > - Received task CHAIN DataSource (at main(WordCount.java:70) > (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2) > 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at
[jira] [Commented] (FLINK-3985) A metric with the name * was already registered
[ https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246935#comment-16246935 ] Su Ralph commented on FLINK-3985: - i can still see this in 1.2.0. It's marked as fixed but without fix version? > A metric with the name * was already registered > --- > > Key: FLINK-3985 > URL: https://issues.apache.org/jira/browse/FLINK-3985 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: test-stability > > The YARN tests detected the following failure while running WordCount. > {code} > 2016-05-27 21:50:48,230 INFO org.apache.flink.yarn.YarnTaskManager > - Received task CHAIN DataSource (at main(WordCount.java:70) > (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2) > 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
[jira] [Updated] (FLINK-8020) Deadlock found in Flink Streaming job
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weihua Jiang updated FLINK-8020: Attachment: jstack53009(2).out Attach a new jstack file. After removing our collector.collect() call out of our lock region, the deadlock was gone. However, the system is still NOT able to run as we find that flink collector hangs. See the attached jstack 53009(2).out file, the thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (1/10)" holds the lock 0x0007b692ad98 and is still waiting for THIS exact lock again. This makes it hang. Our guess is that this is some issue related to async io. After changing our async redis lookup to sync redis lookup (by not using async io), the issue is gone. > Deadlock found in Flink Streaming job > - > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Blocker > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8043) increment job restart metric when fine grained recovery reverted to full job restart
Steven Zhen Wu created FLINK-8043: - Summary: increment job restart metric when fine grained recovery reverted to full job restart Key: FLINK-8043 URL: https://issues.apache.org/jira/browse/FLINK-8043 Project: Flink Issue Type: Bug Components: ResourceManager Affects Versions: 1.3.2 Reporter: Steven Zhen Wu When fine grained recovery failed (e.g. due to not enough taskmager slots when replacement taskmanager node didn't come back in time), Flink will revert to full job restart. In this case, it should also increment "job restart" metric -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8042) retry individual failover-strategy for some time first before reverting to full job restart
Steven Zhen Wu created FLINK-8042: - Summary: retry individual failover-strategy for some time first before reverting to full job restart Key: FLINK-8042 URL: https://issues.apache.org/jira/browse/FLINK-8042 Project: Flink Issue Type: Bug Components: ResourceManager Affects Versions: 1.3.2 Reporter: Steven Zhen Wu Let's we will a taskmanager node. When Flink tries to attempt fine grained recovery and fails replacement taskmanager node didn't come back in time, it reverts to full job restart. Stephan and Till was suggesting that Flink can/should retry fine grained recovery for some time before giving up and reverting full job restart -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8036) Consider using gradle to build Flink
[ https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-8036. --- Resolution: Later Resolving since there is not enough interest. > Consider using gradle to build Flink > > > Key: FLINK-8036 > URL: https://issues.apache.org/jira/browse/FLINK-8036 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Here is summary from Lukasz over this thread > (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool) > w.r.t. performance boost from using gradle: > Maven performs parallelization at the module level, an entire module needs > to complete before any dependent modules can start, this means running all > the checks like findbugs, checkstyle, tests need to finish. Gradle has task > level parallelism between subprojects which means that as soon as the > compile and shade steps are done for a project, and dependent subprojects > can typically start. This means that we get increased parallelism due to > not needing to wait for findbugs, checkstyle, tests to run. I typically see > ~20 tasks (at peak) running on my desktop in parallel. > Flink should consider using gradle - on Linux with SSD, a clean build takes > an hour. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink
[ https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246715#comment-16246715 ] Chesnay Schepler commented on FLINK-8036: - I can see the benefit, but as it stands I'm against it. As Fabian said, this would entail changing many integral parts of the system, not to mention how users build custom flink versions. Furthermore, there are just really trivial workarounds for the described use-case: Step1: compile Flink (without checkstyle etc.) Step2: invoke checkstyle, findbugs, tests etc. for each submodule in a separate process This is rather trivial to implement and should buy you plenty of time. > Consider using gradle to build Flink > > > Key: FLINK-8036 > URL: https://issues.apache.org/jira/browse/FLINK-8036 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Here is summary from Lukasz over this thread > (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool) > w.r.t. performance boost from using gradle: > Maven performs parallelization at the module level, an entire module needs > to complete before any dependent modules can start, this means running all > the checks like findbugs, checkstyle, tests need to finish. Gradle has task > level parallelism between subprojects which means that as soon as the > compile and shade steps are done for a project, and dependent subprojects > can typically start. This means that we get increased parallelism due to > not needing to wait for findbugs, checkstyle, tests to run. I typically see > ~20 tasks (at peak) running on my desktop in parallel. > Flink should consider using gradle - on Linux with SSD, a clean build takes > an hour. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm
[ https://issues.apache.org/jira/browse/FLINK-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christos Hadjinikolis updated FLINK-8041: - Description: At the moment, the ConnectedComponents algorithm that comes with Flink's native Graph API (Gelly) has two issues: 1. It relies on the user to provide correct values for in the vertices DataSet. Based on how the algorithm works, these values must be of type Long and be unique for every vertex. If the user provides the same values for every vertex (e.g. 1) the algorithm still works but as those values are used for the identification of the different connected components, one will end up with a single connected component and will have no clue as to why this happened. This can be easily fixed in two ways: either by checking that the values that appear alongside vertex-ids are unique and informing the user if not, or by generating those values for every vertex before the algorithm is ran. I have a running implementation of the second way and I really think it is an appropriate solution to this problem. 2. Once the connected components are identified, one has to apply additional transformations and actions to find out which is the biggest or the order of the connected components in terms of their size. Alternatively, the algorithm can be implemented so that numerical ids that are given to each component reflect their ranking when ordered based on size, e.g. connected component 1 will be the biggest, connected component 2 should be the second biggest and so on. I have also solved this and I think it would make a nice addition. was: At the moment, the ConnectedComponents algorithm that comes with Flink's native Graph API (Gelly) has two issues: 1. It relies on the user to provide correct values for in the vertices DataSet. Based on how the algorithm works, these values must be of type Long and be unique for every vertex. If the user provides the same values for every vertex (e.g. 1) the algorithm still works but as those values are used for the identification of the different connected components, one will end up with a single connected component and will have no clue as to why this happened. This can be easily fixed in two ways: either by checking that the values that appear alongside vertex-ids are unique and informing the user if not, or by generating those values for every vertex before the algorithm is ran. I have a running implementation of the second way and I really think it is an appropriate solution to this problem. 2. Once the connected components are identified, one has to apply additional transformations and actions to find out which is the biggest or the order of the connected components in terms of their size. Alternatively, the algorithm can be implemented so that numerical ids that are given to each component reflect their ranking when ordered based on size, e.g. connected component 1 will be the biggest, connected component 2 should be the second biggest and so on. > Recommended Improvements for Gelly's Connected Components Algorithm > --- > > Key: FLINK-8041 > URL: https://issues.apache.org/jira/browse/FLINK-8041 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.3.2 > Environment: Linux, IntelliJ IDEA >Reporter: Christos Hadjinikolis >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 336h > Remaining Estimate: 336h > > At the moment, the ConnectedComponents algorithm that comes with Flink's > native Graph API (Gelly) has two issues: > 1. It relies on the user to provide correct values for in the vertices > DataSet. Based on how the algorithm works, these values must be of type Long > and be unique for every vertex. If the user provides the same values for > every vertex (e.g. 1) the algorithm still works but as those values are used > for the identification of the different connected components, one will end up > with a single connected component and will have no clue as to why this > happened. This can be easily fixed in two ways: either by checking that the > values that appear alongside vertex-ids are unique and informing the user if > not, or by generating those values for every vertex before the algorithm is > ran. I have a running implementation of the second way and I really think it > is an appropriate solution to this problem. > 2. Once the connected components are identified, one has to apply additional > transformations and actions to find out which is the biggest or the order of > the connected components in terms of their size. Alternatively, the algorithm > can be implemented so that numerical ids that are given to each component > reflect their ranking when ordered based on size, e.g.
[jira] [Created] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm
Christos Hadjinikolis created FLINK-8041: Summary: Recommended Improvements for Gelly's Connected Components Algorithm Key: FLINK-8041 URL: https://issues.apache.org/jira/browse/FLINK-8041 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.3.2 Environment: Linux, IntelliJ IDEA Reporter: Christos Hadjinikolis Priority: Minor Fix For: 1.4.0 At the moment, the ConnectedComponents algorithm that comes with Flink's native Graph API (Gelly) has two issues: 1. It relies on the user to provide correct values for in the vertices DataSet. Based on how the algorithm works, these values must be of type Long and be unique for every vertex. If the user provides the same values for every vertex (e.g. 1) the algorithm still works but as those values are used for the identification of the different connected components, one will end up with a single connected component and will have no clue as to why this happened. This can be easily fixed in two ways: either by checking that the values that appear alongside vertex-ids are unique and informing the user if not, or by generating those values for every vertex before the algorithm is ran. I have a running implementation of the second way and I really think it is an appropriate solution to this problem. 2. Once the connected components are identified, one has to apply additional transformations and actions to find out which is the biggest or the order of the connected components in terms of their size. Alternatively, the algorithm can be implemented so that numerical ids that are given to each component reflect their ranking when ordered based on size, e.g. connected component 1 will be the biggest, connected component 2 should be the second biggest and so on. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
[ https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246487#comment-16246487 ] Greg Hogan commented on FLINK-7919: --- [~fhueske] would an alternative be to use inner join semantics (dropping the unmatched element) and potentially support outer joins in the future? > Join with Solution Set fails with NPE if Solution Set has no entry > -- > > Key: FLINK-7919 > URL: https://issues.apache.org/jira/browse/FLINK-7919 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske > > A job with a delta iteration fails hard with a NPE in the solution set join, > if the solution set has no entry for the join key of the probe side. > The following program reproduces the problem: > {code} > DataSet> values = env.fromElements( > Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); > DeltaIteration , Tuple2 > di = values > .iterateDelta(values, 5,0); > DataSet > loop = di.getWorkset() > .map(new MapFunction , Tuple2 >() { > @Override > public Tuple2 map(Tuple2 value) throws > Exception { > // modifying the key to join on a non existing solution set key > return Tuple2.of(value.f0 + 1, 1); > } > }) > .join(di.getSolutionSet()).where(0).equalTo(0) > .with(new JoinFunction , Tuple2 , > Tuple2 >() { > @Override > public Tuple2 join( > Tuple2 first, > Tuple2 second) throws Exception { > > return Tuple2.of(first.f0, first.f1 + second.f1); > } > }); > DataSet > result = di.closeWith(loop, loop); > result.print(); > {code} > It doesn't matter whether the solution set is managed or not. > The problem is cause because the solution set hash table prober returns a > {{null}} value if the solution set does not contain a value for the probe > side key. > The join operator does not check if the return value is {{null}} or not but > immediately tries to create a copy using a {{TypeSerializer}}. This copy > fails with a NPE. > I propose to check for {{null}} and call the join function with {{null}} on > the solution set side. This gives OUTER JOIN semantics for join. > Since the code was previously failing with a NPE, it is safe to forward the > {{null}} into the {{JoinFunction}}. > However, users must be aware that the solution set value may be {{null}} and > we need to update the documentation (JavaDocs + website) to describe the > behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8014) Add Kafka010JsonTableSink
[ https://issues.apache.org/jira/browse/FLINK-8014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246472#comment-16246472 ] ASF GitHub Bot commented on FLINK-8014: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4990 [FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSink and documentation ## What is the purpose of the change - Adds a TableSink to write JSON-encoded rows to Kafka 0.10 topics - Adds documentation for KafkaJsonTableSinks ## Brief change log * Add Kafka010JsonTableSink * Enable flush on checkpoint to ensure at-least-once guarantees * Refactor tests for KafkaJsonTableSInks * Add documentation for KafkaJsonTableSinks ## Verifying this change * Tests have been added. * The sink uses a regular Kafka 0.10 producer * The SerializationSchema is separately tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **YES** - If yes, how is the feature documented? Documentation has been added to website You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableKafka10Sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4990.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4990 commit 937637b340ca02be770f040d94396b18bc9619d7 Author: Fabian HueskeDate: 2017-11-07T16:59:43Z [FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests. commit b0fcc04229aac97a938fffdcdcad9b388cc37d72 Author: Fabian Hueske Date: 2017-11-09T14:07:17Z [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks. > Add Kafka010JsonTableSink > - > > Key: FLINK-8014 > URL: https://issues.apache.org/jira/browse/FLINK-8014 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.4.0 > > > Offer a TableSource for JSON-encoded Kafka 0.10 topics but no TableSink. > Since, the required base classes are already there, a > {{Kafka010JsonTableSink}} can be easily added. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4990: [FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSin...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4990 [FLINK-8014] [FLINK-8016] Add Kafka010JsonTableSink and documentation ## What is the purpose of the change - Adds a TableSink to write JSON-encoded rows to Kafka 0.10 topics - Adds documentation for KafkaJsonTableSinks ## Brief change log * Add Kafka010JsonTableSink * Enable flush on checkpoint to ensure at-least-once guarantees * Refactor tests for KafkaJsonTableSInks * Add documentation for KafkaJsonTableSinks ## Verifying this change * Tests have been added. * The sink uses a regular Kafka 0.10 producer * The SerializationSchema is separately tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **YES** - If yes, how is the feature documented? Documentation has been added to website You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableKafka10Sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4990.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4990 commit 937637b340ca02be770f040d94396b18bc9619d7 Author: Fabian HueskeDate: 2017-11-07T16:59:43Z [FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests. commit b0fcc04229aac97a938fffdcdcad9b388cc37d72 Author: Fabian Hueske Date: 2017-11-09T14:07:17Z [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks. ---
[jira] [Commented] (FLINK-7977) bump version of compatibility check for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246436#comment-16246436 ] ASF GitHub Bot commented on FLINK-7977: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4945 @greghogan That's a great point. > bump version of compatibility check for Flink 1.4 > - > > Key: FLINK-7977 > URL: https://issues.apache.org/jira/browse/FLINK-7977 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 > should check compatibility with 1.2 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4945: [FLINK-7977][build] bump version of compatibility check f...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4945 @greghogan That's a great point. ---
[jira] [Commented] (FLINK-2973) Add flink-benchmark with compliant licenses again
[ https://issues.apache.org/jira/browse/FLINK-2973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246432#comment-16246432 ] Greg Hogan commented on FLINK-2973: --- [~fhueske] it looks to be GPLv2 (with classpath exception): http://hg.openjdk.java.net/jdk9/jdk9/file/a08cbfc0e4ec/LICENSE > Add flink-benchmark with compliant licenses again > - > > Key: FLINK-2973 > URL: https://issues.apache.org/jira/browse/FLINK-2973 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.0.0 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Minor > Fix For: 1.0.0 > > > We recently created the Maven module {{flink-benchmark}} for micro-benchmarks > and ported most of the existing micro-benchmarks to the Java benchmarking > framework JMH. However, JMH is part of OpenJDK and under GPL license which is > not compatible with the AL2. > Consequently, we need to remove this dependency and either revert the porting > commits or port the benchmarks to another benchmarking framework. An > alternative could be [Google's Caliper|https://github.com/google/caliper] > library which is under AL2. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4976: [FLINK-8017] Fix High availability cluster-id key ...
Github user dkelley-accretive closed the pull request at: https://github.com/apache/flink/pull/4976 ---
[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation
[ https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246245#comment-16246245 ] ASF GitHub Bot commented on FLINK-8017: --- Github user dkelley-accretive closed the pull request at: https://github.com/apache/flink/pull/4976 > High availability cluster-id option incorrect in documentation > -- > > Key: FLINK-8017 > URL: https://issues.apache.org/jira/browse/FLINK-8017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Dan Kelley >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > The property key in HighAvailabilityOptions.java is > high-availability.cluster-id however the documentation states that the key is > high-availability.zookeeper.path.cluster-id -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8020) Deadlock found in Flink Streaming job
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246081#comment-16246081 ] Kostas Kloudas commented on FLINK-8020: --- [~greghogan] I suppose you can close it and [~whjiang] can re-open it in case it is a valid JIRA. > Deadlock found in Flink Streaming job > - > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Blocker > Attachments: jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/4943 ---
[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246015#comment-16246015 ] ASF GitHub Bot commented on FLINK-6022: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/4943 > Don't serialise Schema when serialising Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen > Fix For: 1.5.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16246007#comment-16246007 ] ASF GitHub Bot commented on FLINK-7419: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4981#discussion_r150020494 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -185,17 +185,6 @@ under the License. ${project.basedir}/target/dependency-reduced-pom.xml true --- End diff -- We can probably remove the complete shading entry in that pom. Seem not to do anything anyways. Promoting transitive dependencies when nothing except `force-shading` is shaded into the jar should be a no-op. (theoretically, one never knows with the shade plugin ;-) ) > Shade jackson dependency in flink-avro > -- > > Key: FLINK-7419 > URL: https://issues.apache.org/jira/browse/FLINK-7419 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.4.0 > > > Avro uses {{org.codehouse.jackson}} which also exists in multiple > incompatible versions. We should shade it to > {{org.apache.flink.shaded.avro.org.codehouse.jackson}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4981: [FLINK-7419][build][avro] Relocate jackson in flin...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4981#discussion_r150020494 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -185,17 +185,6 @@ under the License. ${project.basedir}/target/dependency-reduced-pom.xml true --- End diff -- We can probably remove the complete shading entry in that pom. Seem not to do anything anyways. Promoting transitive dependencies when nothing except `force-shading` is shaded into the jar should be a no-op. (theoretically, one never knows with the shade plugin ;-) ) ---
[jira] [Commented] (FLINK-8011) Set dist flink-python dependency to provided
[ https://issues.apache.org/jira/browse/FLINK-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245997#comment-16245997 ] ASF GitHub Bot commented on FLINK-8011: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4973 Yes, that is how it should be. +1 > Set dist flink-python dependency to provided > > > Key: FLINK-8011 > URL: https://issues.apache.org/jira/browse/FLINK-8011 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > We can simplify the flink-dist pom by setting the flink-python dependency to > provided, which allows us to remove an exclusion from the shade plugin > configuration. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4973: [FLINK-8011][dist] Set flink-python to provided
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4973 Yes, that is how it should be. +1 ---
[jira] [Commented] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected
[ https://issues.apache.org/jira/browse/FLINK-8006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245993#comment-16245993 ] ASF GitHub Bot commented on FLINK-8006: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4968 LGTM +1 to merge > flink-daemon.sh: line 103: binary operator expected > --- > > Key: FLINK-8006 > URL: https://issues.apache.org/jira/browse/FLINK-8006 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.3.2 > Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) > i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux >Reporter: Alejandro > Labels: easyfix, newbie > Original Estimate: 1m > Remaining Estimate: 1m > > When executing `./bin/start-local.sh` I get > flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect > flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary > operator expected > I solved the problem replacing $pid by "$pid" in lines 79 and 103. > Should I make a PR to the repo? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4968: [FLINK-8006] [Startup Shell Scripts] Enclosing $pid in qu...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4968 LGTM +1 to merge ---
[jira] [Commented] (FLINK-7886) Enable dependency convergence for flink-core
[ https://issues.apache.org/jira/browse/FLINK-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245925#comment-16245925 ] Aljoscha Krettek commented on FLINK-7886: - Implemented on release-1.4 in 005a871771ce73bef9c78ee04a61817fa9a31e99 > Enable dependency convergence for flink-core > > > Key: FLINK-7886 > URL: https://issues.apache.org/jira/browse/FLINK-7886 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245879#comment-16245879 ] ASF GitHub Bot commented on FLINK-8005: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4980 I agree! +1 to merge as soon as Travis gives us the green light. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at >
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4980 I agree! +1 to merge as soon as Travis gives us the green light. ---
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 thanks, I think this is excellent now. ð I'll merge as soon as travis is green. ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245867#comment-16245867 ] ASF GitHub Bot commented on FLINK-8005: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 thanks, I think this is excellent now. I'll merge as soon as travis is green. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at >
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 Thanks! ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245825#comment-16245825 ] ASF GitHub Bot commented on FLINK-8005: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149993559 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- yes, a latch that was already triggered will simply return immediately, no need for an additional check > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149993559 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- yes, a latch that was already triggered will simply return immediately, no need for an additional check ---
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245808#comment-16245808 ] ASF GitHub Bot commented on FLINK-7765: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 Thanks! > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245810#comment-16245810 ] ASF GitHub Bot commented on FLINK-7765: --- Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4777 > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4777: [FLINK-7765] Enable dependency convergence
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4777 ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245781#comment-16245781 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149985097 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- I think it doesn't matter because the latch already checks for the flag. ``` public void await() throws InterruptedException { synchronized (lock) { while (!triggered) { lock.wait(); } } } ``` > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at >
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245767#comment-16245767 ] ASF GitHub Bot commented on FLINK-4228: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149980302 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- Can't we enforce jackson 2.6 via dependency management? I think this would be cleaner than excluding the dependencies here and assume that `aws-java-sdk-s3` pulls in the missing dependencies. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149980302 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- Can't we enforce jackson 2.6 via dependency management? I think this would be cleaner than excluding the dependencies here and assume that `aws-java-sdk-s3` pulls in the missing dependencies. ---
[jira] [Assigned] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
[ https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-8040: --- Assignee: Stefan Richter > Test instability ResourceGuard#testCloseBlockIfAcquired > --- > > Key: FLINK-8040 > URL: https://issues.apache.org/jira/browse/FLINK-8040 > Project: Flink > Issue Type: Bug > Components: Core, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Gary Yao >Assignee: Stefan Richter > Labels: test-stability > Fix For: 1.4.0, 1.5.0 > > > Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with > {noformat} > java.io.IOException: Resource guard was already closed. > at > org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) > at > org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > {noformat} > *How to reproduce* > Run the test with a high number of iterations. > To further provoke a failure, add {{Thread.sleep(100)}} before the following > line > {code} > ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); > {code} > This will more likely result in {{closed}} being {{true}} at the time the 2nd > lease is acquired and an exception is thrown: > {code} > if (closed) { > throw new IOException("Resource guard was already closed."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149985097 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- I think it doesn't matter because the latch already checks for the flag. ``` public void await() throws InterruptedException { synchronized (lock) { while (!triggered) { lock.wait(); } } } ``` ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245776#comment-16245776 ] ASF GitHub Bot commented on FLINK-8005: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149984491 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- Sorry, I was just looking on the IDE and missed the lines. This line should be before every time you call `await` on the `latch`. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149984491 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- Sorry, I was just looking on the IDE and missed the lines. This line should be before every time you call `await` on the `latch`. ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245773#comment-16245773 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149983878 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- Why is that? I think at this point the latch might not get triggered at all. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149983878 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- Why is that? I think at this point the latch might not get triggered at all. ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245769#comment-16245769 ] ASF GitHub Bot commented on FLINK-4228: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983025 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- It might be necessary to add these dependencies then explicitly here. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: >
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245768#comment-16245768 ] ASF GitHub Bot commented on FLINK-4228: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983029 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- Shouldn't we override the avro version instead of excluding it? > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983029 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- Shouldn't we override the avro version instead of excluding it? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983025 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- It might be necessary to add these dependencies then explicitly here. ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245765#comment-16245765 ] ASF GitHub Bot commented on FLINK-8005: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149982812 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- for all latches, it should also have:`if (!latch.isTriggered()) { latch.await() }` > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149982812 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -277,8 +287,12 @@ public void invoke() throws Exception { } } - triggerLatch.trigger(); if (error != null) { + // exit method prematurely due to error but make sure that the tests can finish + triggerLatch.trigger(); --- End diff -- for all latches, it should also have:`if (!latch.isTriggered()) { latch.await() }` ---
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245753#comment-16245753 ] ASF GitHub Bot commented on FLINK-7765: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4777 Thanks! Could you please close if GH doesn't auto-close? > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7886) Enable dependency convergence for flink-core
[ https://issues.apache.org/jira/browse/FLINK-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7886. --- Resolution: Fixed Implemented on master in bdbca37b01254aef9ddd6943d34dab1838c2a9f1 > Enable dependency convergence for flink-core > > > Key: FLINK-7886 > URL: https://issues.apache.org/jira/browse/FLINK-7886 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4777 Thanks! Could you please close if GH doesn't auto-close? ---
[GitHub] flink issue #4989: [Flink 7003] [Table API & SQL] use PEEK_FIELDS_NO_EXPAND ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4989 Thanks for the PR @suez1224. I will merge it. Thanks, Fabian ---
[jira] [Commented] (FLINK-8006) flink-daemon.sh: line 103: binary operator expected
[ https://issues.apache.org/jira/browse/FLINK-8006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245714#comment-16245714 ] ASF GitHub Bot commented on FLINK-8006: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4968 +1 > flink-daemon.sh: line 103: binary operator expected > --- > > Key: FLINK-8006 > URL: https://issues.apache.org/jira/browse/FLINK-8006 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.3.2 > Environment: Linux 4.12.12-gentoo #2 SMP x86_64 Intel(R) Core(TM) > i3-3110M CPU @ 2.40GHz GenuineIntel GNU/Linux >Reporter: Alejandro > Labels: easyfix, newbie > Original Estimate: 1m > Remaining Estimate: 1m > > When executing `./bin/start-local.sh` I get > flink-1.3.2/bin/flink-daemon.sh: line 79: $pid: ambiguous redirect > flink-1.3.2/bin/flink-daemon.sh: line 103: [: /tmp/flink-Alejandro: binary > operator expected > I solved the problem replacing $pid by "$pid" in lines 79 and 103. > Should I make a PR to the repo? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4968: [FLINK-8006] [Startup Shell Scripts] Enclosing $pid in qu...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4968 +1 ---
[jira] [Commented] (FLINK-7679) Upgrade maven enforcer plugin to 3.0.0-M1
[ https://issues.apache.org/jira/browse/FLINK-7679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245711#comment-16245711 ] Greg Hogan commented on FLINK-7679: --- So it looks like Scala 2.12+ is required for Java 9. > Upgrade maven enforcer plugin to 3.0.0-M1 > - > > Key: FLINK-7679 > URL: https://issues.apache.org/jira/browse/FLINK-7679 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou UTC+8 > > I got the following build error against Java 9: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-maven) > on project flink-parent: Execution enforce-maven of goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: An API > incompatibility was encountered while executing > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce: > java.lang.ExceptionInInitializerError: null > [ERROR] - > [ERROR] realm =plugin>org.apache.maven.plugins:maven-enforcer-plugin:1.4.1 > [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy > [ERROR] urls[0] = > file:/home/hbase/.m2/repository/org/apache/maven/plugins/maven-enforcer-plugin/1.4.1/maven-enforcer-plugin-1.4.1.jar > {code} > Upgrading maven enforcer plugin to 3.0.0-M1 would get over the above error. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245681#comment-16245681 ] ASF GitHub Bot commented on FLINK-8005: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 Yes, but I think this is making an assumption about the internal implementation. If someone changes that the test could break/not test the right thing anymore. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at >
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 Yes, but I think this is making an assumption about the internal implementation. If someone changes that the test could break/not test the right thing anymore. ---
[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
[ https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8040: Description: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired and an exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} was: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and an exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} > Test instability ResourceGuard#testCloseBlockIfAcquired > --- > > Key: FLINK-8040 > URL: https://issues.apache.org/jira/browse/FLINK-8040 > Project: Flink > Issue Type: Bug > Components: Core, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Gary Yao > Labels: test-stability > Fix For: 1.4.0, 1.5.0 > > > Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with > {noformat} > java.io.IOException: Resource guard was already closed. > at > org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) > at > org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at >
[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
[ https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8040: Description: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and an exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} was: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and an exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} > Test instability ResourceGuard#testCloseBlockIfAcquired > --- > > Key: FLINK-8040 > URL: https://issues.apache.org/jira/browse/FLINK-8040 > Project: Flink > Issue Type: Bug > Components: Core, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Gary Yao > Labels: test-stability > Fix For: 1.4.0, 1.5.0 > > > Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with > {noformat} > java.io.IOException: Resource guard was already closed. > at > org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) > at > org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at >
[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
[ https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8040: Description: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and an exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} was: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} > Test instability ResourceGuard#testCloseBlockIfAcquired > --- > > Key: FLINK-8040 > URL: https://issues.apache.org/jira/browse/FLINK-8040 > Project: Flink > Issue Type: Bug > Components: Core, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Gary Yao > Labels: test-stability > Fix For: 1.4.0, 1.5.0 > > > Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with > {noformat} > java.io.IOException: Resource guard was already closed. > at > org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) > at > org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at >
[jira] [Created] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
Gary Yao created FLINK-8040: --- Summary: Test instability ResourceGuard#testCloseBlockIfAcquired Key: FLINK-8040 URL: https://issues.apache.org/jira/browse/FLINK-8040 Project: Flink Issue Type: Bug Components: Core, Tests Affects Versions: 1.4.0, 1.5.0 Reporter: Gary Yao Fix For: 1.4.0, 1.5.0 Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke the failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired
[ https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8040: Description: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke a failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} was: Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with {noformat} java.io.IOException: Resource guard was already closed. at org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) at org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) {noformat} *How to reproduce* Run the test with a high number of iterations. To further provoke the failure, add {{Thread.sleep(100)}} before the following line {code} ResourceGuard.Lease lease_2 = resourceGuard.acquireResource(); {code} This will more likely result in {{closed}} being {{true}} at the time the 2nd lease is acquired, and and exception is thrown: {code} if (closed) { throw new IOException("Resource guard was already closed."); } {code} > Test instability ResourceGuard#testCloseBlockIfAcquired > --- > > Key: FLINK-8040 > URL: https://issues.apache.org/jira/browse/FLINK-8040 > Project: Flink > Issue Type: Bug > Components: Core, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Gary Yao > Labels: test-stability > Fix For: 1.4.0, 1.5.0 > > > Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with > {noformat} > java.io.IOException: Resource guard was already closed. > at > org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72) > at > org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at >
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245638#comment-16245638 ] ASF GitHub Bot commented on FLINK-7765: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 https://issues.apache.org/jira/browse/FLINK-7765 - this is a parent issue enclosing fixing other modules. Split between other subtasks is very, very arbitrary, but I guess next milestone would be to enable convergence in `flink-runtime`, while tackling the issue of `flink-shaded-hadoop`. > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245640#comment-16245640 ] Robert Metzger commented on FLINK-6022: --- This whole JIRA is only about the case when people are using a DataStream. I agree its not a good idea, but people are doing it. As stated in the JIRA description, the schema of all records in a stream need to be the same for this to work. I believe we should keep this JIRA open, because the problem has not been addressed. > Don't serialise Schema when serialising Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen > Fix For: 1.5.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 https://issues.apache.org/jira/browse/FLINK-7765 - this is a parent issue enclosing fixing other modules. Split between other subtasks is very, very arbitrary, but I guess next milestone would be to enable convergence in `flink-runtime`, while tackling the issue of `flink-shaded-hadoop`. ---
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245631#comment-16245631 ] ASF GitHub Bot commented on FLINK-7765: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4777 I'll merge, yes, what are the follow-up issues you created? > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245630#comment-16245630 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/4980 There is only one thread dispatching the calls: ``` executor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask)); this.asyncCallDispatcher = executor; ``` The tasks cannot overtake each other. I could make the test more strict and wait additionally on `triggerLatch` in case somebody decides to have multiple threads. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at >
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/4980 There is only one thread dispatching the calls: ``` executor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask)); this.asyncCallDispatcher = executor; ``` The tasks cannot overtake each other. I could make the test more strict and wait additionally on `triggerLatch` in case somebody decides to have multiple threads. ---
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4777 I'll merge, yes, what are the follow-up issues you created? ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245623#comment-16245623 ] ASF GitHub Bot commented on FLINK-8005: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 I think waiting on the stop latch might not be enough (in 100 % of cases) because the other two calls are also asynchronous. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at >
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4980 I think waiting on the stop latch might not be enough (in 100 % of cases) because the other two calls are also asynchronous. ---
[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation
[ https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245603#comment-16245603 ] ASF GitHub Bot commented on FLINK-8017: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4976 Could you please close this if it doesn't auto-close? > High availability cluster-id option incorrect in documentation > -- > > Key: FLINK-8017 > URL: https://issues.apache.org/jira/browse/FLINK-8017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Dan Kelley >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > The property key in HighAvailabilityOptions.java is > high-availability.cluster-id however the documentation states that the key is > high-availability.zookeeper.path.cluster-id -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8017) High availability cluster-id option incorrect in documentation
[ https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8017. --- Resolution: Fixed Fixed on release-1.4 in 02a19a14fad1ef928038f4971bdcacf4d0642d88 Fixed on master in 624df0120a962fe93d31544d7b13637121196197 > High availability cluster-id option incorrect in documentation > -- > > Key: FLINK-8017 > URL: https://issues.apache.org/jira/browse/FLINK-8017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Dan Kelley >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > The property key in HighAvailabilityOptions.java is > high-availability.cluster-id however the documentation states that the key is > high-availability.zookeeper.path.cluster-id -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4976 Could you please close this if it doesn't auto-close? ---
[jira] [Updated] (FLINK-8017) High availability cluster-id option incorrect in documentation
[ https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8017: Fix Version/s: 1.4.0 > High availability cluster-id option incorrect in documentation > -- > > Key: FLINK-8017 > URL: https://issues.apache.org/jira/browse/FLINK-8017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Dan Kelley >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > The property key in HighAvailabilityOptions.java is > high-availability.cluster-id however the documentation states that the key is > high-availability.zookeeper.path.cluster-id -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245600#comment-16245600 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/4980 I addressed the comments. Let's wait for Travis and let me know if something else needs to be changed. @aljoscha @kl0u > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at
[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/4980 I addressed the comments. Let's wait for Travis and let me know if something else needs to be changed. @aljoscha @kl0u ---
[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation
[ https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245599#comment-16245599 ] ASF GitHub Bot commented on FLINK-8017: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4976 Thanks for spotting and fixing this! I'm merging. > High availability cluster-id option incorrect in documentation > -- > > Key: FLINK-8017 > URL: https://issues.apache.org/jira/browse/FLINK-8017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Dan Kelley >Priority: Minor > Original Estimate: 1h > Remaining Estimate: 1h > > The property key in HighAvailabilityOptions.java is > high-availability.cluster-id however the documentation states that the key is > high-availability.zookeeper.path.cluster-id -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4976 Thanks for spotting and fixing this! ð I'm merging. ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245597#comment-16245597 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149955008 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -254,12 +300,10 @@ else if (this.error == null) { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { - throw new UnsupportedOperationException("Should not be called"); } @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { - throw new UnsupportedOperationException("Should not be called"); --- End diff -- Not sure anymore but I decided to add it again. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149955008 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -254,12 +300,10 @@ else if (this.error == null) { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { - throw new UnsupportedOperationException("Should not be called"); } @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { - throw new UnsupportedOperationException("Should not be called"); --- End diff -- Not sure anymore but I decided to add it again. ---
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245596#comment-16245596 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954849 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -254,12 +300,10 @@ else if (this.error == null) { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { - throw new UnsupportedOperationException("Should not be called"); --- End diff -- Not sure but I decided to add it again. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at >
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245592#comment-16245592 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954576 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +59,144 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; - + private static int numCalls; + + /** Triggered at the beginning of {@link CheckpointsInOrderInvokable#invoke()}. */ private static OneShotLatch awaitLatch; + + /** +* Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)} +* was called {@link #numCalls} times. +*/ private static OneShotLatch triggerLatch; + private static final List classLoaders = new ArrayList<>(); + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders.clear(); } // // Tests // - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - + awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { + + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - + triggerLatch.await(); - + assertFalse(task.isCanceledOrFailed()); ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); - task.getExecutingThread().join(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); } } @Test - public void testMixedAsyncCallsInOrder() { - try { - Task task = createTask(); + public void testMixedAsyncCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); awaitLatch.await(); - for (int i = 1; i <= NUM_CALLS; i++) { + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); task.notifyCheckpointComplete(i); } triggerLatch.await(); assertFalse(task.isCanceledOrFailed());
[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245593#comment-16245593 ] ASF GitHub Bot commented on FLINK-8005: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954606 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java --- @@ -29,21 +31,41 @@ private final ThreadGroup group; private final String threadName; + + private final ClassLoader classLoader; /** * Creates a new thread factory. -* +* * @param group The group that the threads will be associated with. * @param threadName The name for the threads. */ public DispatcherThreadFactory(ThreadGroup group, String threadName) { + this(group, threadName, null); + } + + /** +* Creates a new thread factory. +* +* @param group The group that the threads will be associated with. +* @param threadName The name for the threads. +* @param classLoader The {@link ClassLoader} to be set as context class loader. +*/ + public DispatcherThreadFactory( + ThreadGroup group, --- End diff -- Indented. > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > -- > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at >
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954849 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -254,12 +300,10 @@ else if (this.error == null) { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { - throw new UnsupportedOperationException("Should not be called"); --- End diff -- Not sure but I decided to add it again. ---
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954606 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java --- @@ -29,21 +31,41 @@ private final ThreadGroup group; private final String threadName; + + private final ClassLoader classLoader; /** * Creates a new thread factory. -* +* * @param group The group that the threads will be associated with. * @param threadName The name for the threads. */ public DispatcherThreadFactory(ThreadGroup group, String threadName) { + this(group, threadName, null); + } + + /** +* Creates a new thread factory. +* +* @param group The group that the threads will be associated with. +* @param threadName The name for the threads. +* @param classLoader The {@link ClassLoader} to be set as context class loader. +*/ + public DispatcherThreadFactory( + ThreadGroup group, --- End diff -- Indented. ---
[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149954576 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +59,144 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; - + private static int numCalls; + + /** Triggered at the beginning of {@link CheckpointsInOrderInvokable#invoke()}. */ private static OneShotLatch awaitLatch; + + /** +* Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)} +* was called {@link #numCalls} times. +*/ private static OneShotLatch triggerLatch; + private static final List classLoaders = new ArrayList<>(); + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders.clear(); } // // Tests // - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - + awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { + + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - + triggerLatch.await(); - + assertFalse(task.isCanceledOrFailed()); ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); - task.getExecutingThread().join(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); } } @Test - public void testMixedAsyncCallsInOrder() { - try { - Task task = createTask(); + public void testMixedAsyncCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); awaitLatch.await(); - for (int i = 1; i <= NUM_CALLS; i++) { + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); task.notifyCheckpointComplete(i); } triggerLatch.await(); assertFalse(task.isCanceledOrFailed()); + ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { -
[jira] [Assigned] (FLINK-8021) End-to-end tests may not shutdown cluster on failure
[ https://issues.apache.org/jira/browse/FLINK-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-8021: --- Assignee: Aljoscha Krettek > End-to-end tests may not shutdown cluster on failure > > > Key: FLINK-8021 > URL: https://issues.apache.org/jira/browse/FLINK-8021 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek > > In this job https://travis-ci.org/zentol/flink/jobs/298656917 the kafka E2E > test failed straight away due to a missing class. The subsequent tests failed > since they cannot allocate the JM port. > It is thus likely that the E2E tests do not shutdown the cluster in all > failure cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245581#comment-16245581 ] ASF GitHub Bot commented on FLINK-7765: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 All is green, could we merge this? This PR has a quite large silent conflict chance during rebasing :( (whenever someone changes any dependency on master) > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 All is green, could we merge this? This PR has a quite large silent conflict chance during rebasing :( (whenever someone changes any dependency on master) ---
[jira] [Comment Edited] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577 ] Stephan Ewen edited comment on FLINK-6022 at 11/9/17 12:48 PM: --- We are not serializing the schema in the Avro Serializer. If the Avro Serializer is chosen, this is fixed. I am wondering if the case is if one uses explicitly a "generic record" from Avro as the exchange data type. That is not a good idea in the first place in my opinion. In that case, isn't it possible that each generic record is different and thus you always need a schema anyways? I would honestly close this, because I assume the intention was around using Avro's specific record mechanism and the "generic" mechanism (where we use the ReflectDatumReader/Writer). Both should work well now. was (Author: stephanewen): We are not serializing the schema in the Avro Serializer. If the Avro Serializer is chosen, this is fixed. I am wondering if the case is if one uses explicitly a "generic record" from Avro as the exchange data type. That is not a good idea in the first place in my opinion. In that case, isn't it possible that each generic record is different and thus you always need a schema anyways. > Don't serialise Schema when serialising Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen > Fix For: 1.5.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577 ] Stephan Ewen commented on FLINK-6022: - We are not serializing the schema in the Avro Serializer. If the Avro Serializer is chosen, this is fixed. I am wondering if the case is if one uses explicitly a "generic record" from Avro as the exchange data type. That is not a good idea in the first place in my opinion. In that case, isn't it possible that each generic record is different and thus you always need a schema anyways. > Don't serialise Schema when serialising Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Assignee: Stephan Ewen > Fix For: 1.5.0 > > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)