[jira] [Updated] (FLINK-7697) Add metrics for Elasticsearch Sink
[ https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 updated FLINK-7697: -- Fix Version/s: 1.5.0 > Add metrics for Elasticsearch Sink > -- > > Key: FLINK-7697 > URL: https://issues.apache.org/jira/browse/FLINK-7697 > Project: Flink > Issue Type: Wish > Components: ElasticSearch Connector >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Critical > Fix For: 1.5.0 > > > We should add metrics to track events write to ElasticasearchSink. > eg. > * number of successful bulk sends > * number of documents inserted > * number of documents updated > * number of documents version conflicts -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226328#comment-16226328 ] deng commented on FLINK-7881: - I have dig out why the issue happened. As fllow code the failoverProxyProvider is null , so it think it's a No-HA case. The YarnConfiguration didn't read hdfs-site.xml to get "dfs.client.failover.proxy.provider.startdt" value,only yarn-site.xml and core-site.xml be read. !screenshot-1.png! !screenshot-2.png! I have found a solution as below: In org.apache.flink.yarnYarnApplicationMasterRunner.java: replace "final YarnConfiguration yarnConfig = new YarnConfiguration();" with "final YarnConfiguration yarnConfig = new YarnConfiguration(HadoopUtils.getHadoopConfiguration());" In org.apache.flink.runtime.util.HadoopUtils.java: add the below code in getHadoopConfiguration() if (new File(possibleHadoopConfPath + "/yarn-site.xml").exists()) { retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/yarn-site.xml")); if (LOG.isDebugEnabled()) { LOG.debug("Adding " + possibleHadoopConfPath + "/yarn-site.xml to hadoop configuration"); } } > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > Attachments: screenshot-1.png, screenshot-2.png > > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] deng updated FLINK-7881: Attachment: screenshot-2.png > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > Attachments: screenshot-1.png, screenshot-2.png > > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] deng updated FLINK-7881: Attachment: screenshot-1.png > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > Attachments: screenshot-1.png, screenshot-2.png > > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7936) Lack of synchronization w.r.t. taskManagers in MetricStore#add()
[ https://issues.apache.org/jira/browse/FLINK-7936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226280#comment-16226280 ] Bowen Li commented on FLINK-7936: - Hi [~pnowojski], does this still hold after your refactor of {{MetricStore}}? > Lack of synchronization w.r.t. taskManagers in MetricStore#add() > > > Key: FLINK-7936 > URL: https://issues.apache.org/jira/browse/FLINK-7936 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Ted Yu >Priority: Minor > > {code} > String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) > info).taskManagerID; > tm = taskManagers.computeIfAbsent(tmID, k -> new > TaskManagerMetricStore()); > {code} > In other places, access to taskManagers is protected by lock on > MetricStore.this -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226215#comment-16226215 ] ASF GitHub Bot commented on FLINK-7153: --- Github user summerleafs commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r147891494 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // + /** +* Calculates the preferred locations based on the location preference constraint. +* +* @param locationPreferenceConstraint constraint for the location preference +* @return Future containing the collection of preferred locations. This might not be completed if not all inputs +* have been a resource assigned. +*/ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; --- End diff -- Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't yet support reading the checkpoint data locally? I have create a issue for flink reading checkpoint locally [here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it complete i wonder if we can invoke `getPreferedLocations()` instead of `getPreferredLocationsBasedOnInputs()`. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user summerleafs commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r147891494 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // + /** +* Calculates the preferred locations based on the location preference constraint. +* +* @param locationPreferenceConstraint constraint for the location preference +* @return Future containing the collection of preferred locations. This might not be completed if not all inputs +* have been a resource assigned. +*/ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; --- End diff -- Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't yet support reading the checkpoint data locally? I have create a issue for flink reading checkpoint locally [here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it complete i wonder if we can invoke `getPreferedLocations()` instead of `getPreferredLocationsBasedOnInputs()`. ---
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226198#comment-16226198 ] ASF GitHub Bot commented on FLINK-7153: --- Github user summerleafs commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r147889602 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- Hi, allocate resources according to the order of topologically, is just to facilitate the optimization of 'allocate resource base on prefer input'? it may cause bad result if we allocate base on state. > Eager Scheduling can't allocate source for ExecutionGraph correctly > --- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.1 >Reporter: Sihua Zhou >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex > one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is > two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return > empty, cause `sourceSlot` always be null until `ExectionVertex` has been > deployed via 'Execution.deployToSlot()'. So allocate resource base on > prefered location can't work correctly, we need to set the slot info for > `Execution` as soon as Execution.allocateSlotForExecution() called > successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the > test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, > ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three > remote partition. But actually, it should be 2 local partition and 2 remote > partition. > The causes of the above problems is becuase that the current allocate > strategy is allocate the resource for execution one by one(if the execution > can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below > is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which > only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
Github user summerleafs commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r147889602 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- Hi, allocate resources according to the order of topologically, is just to facilitate the optimization of 'allocate resource base on prefer input'? it may cause bad result if we allocate base on state. ---
[jira] [Updated] (FLINK-7588) Document RocksDB tuning for spinning disks
[ https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7588: -- Description: In docs/ops/state/large_state_tuning.md , it was mentioned that: bq. the default configuration is tailored towards SSDs and performs suboptimal on spinning disks We should add recommendation targeting spinning disks: https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk was: In docs/ops/state/large_state_tuning.md , it was mentioned that: bq. the default configuration is tailored towards SSDs and performs suboptimal on spinning disks We should add recommendation targeting spinning disks: https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk > Document RocksDB tuning for spinning disks > -- > > Key: FLINK-7588 > URL: https://issues.apache.org/jira/browse/FLINK-7588 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ted Yu > > In docs/ops/state/large_state_tuning.md , it was mentioned that: > bq. the default configuration is tailored towards SSDs and performs > suboptimal on spinning disks > We should add recommendation targeting spinning disks: > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-7873: -- Description: Why i introduce this: Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwith, and obtain a faster recover. Solution: TaskManager do the cache job and manage the cached data itself. It simple use a TTL-like method to manage cache entry's dispose, we dispose a entry if it wasn't be touched for a X time, once we touch a entry we reset the TTL for it. In this way, all jobs is done by TaskManager, it transparent to JobManager. The only problem is that we may dispose a entry that maybe useful, in this case, we have to read from remote data finally, but users can avoid this by set a proper TTL value according to checkpoint interval and other things. Can someone give me some advice? I would appreciate it very much~ was: Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwidth, and obtain a faster recover. Key problems: 1. Cache the checkpoint data on local disk and manage it's create and delete. 2. introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below: {code:java} class HybridStreamHandle { private StreamStateHandle localHandle; private StreamStateHandle remoteHandle; .. public FSDataInputStream openInputStream() throws IOException { FSDataInputStream inputStream = localHandle.openInputStream(); return inputStream != null ? inputStream : remoteHandle.openInputStream(); } . } {code} Solution: There are two kind solutions I can think of. solution1: Backend do the cached job, and the HybridStreamHandle point to both local and remote data, HybridStreamHandle is managed by CheckpointCoordinator as well as other StreamHandle, so CheckpointCoordinator will perform dispose on it. when HybridStreamHandle performs dispose it call localHandle.dispose() and remoteHandle.dispose(). In this way, we have to record TaskManager's info (like location) in localHandle and add an entry in TaskManager to handle localHandle dispose message, we also have to consider the HA situation. solution2: TaskManager do the cached job and manage the cached data itself. It simple use a TTL-like method to manage handle's dispose, we dispose a handle if it wasn't be touched for a X time. We will touch the handles when we recover from checkpoint or when we performs a checkpoint, once we touch a handle we reset the TTL for it. In this way, all jobs is done by Backend, it transparent to JobManager. The only problem is that we may dispose a handle that maybe useful, but even in this case, we can read from remote data finally, and users can avoid this by set a proper TTL value according to checkpoint interval and other things. Consider trying not to complicate the problem reasons, i prefer to use the solution2. Can someone give me some advice? I would appreciate it very much~ > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to clu
[jira] [Updated] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-7873: -- Summary: Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover (was: Introduce HybridStreamHandle to optimize the recovery mechanism and try to read the checkpoint data locally) > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwidth, and obtain a faster > recover. > Key problems: > 1. Cache the checkpoint data on local disk and manage it's create and delete. > 2. introduce a HybridStreamStateHandler which try to create a local input > stream first, if failed, it then create a remote input stream, it prototype > looks like below: > {code:java} > class HybridStreamHandle { >private StreamStateHandle localHandle; >private StreamStateHandle remoteHandle; >.. >public FSDataInputStream openInputStream() throws IOException { > FSDataInputStream inputStream = localHandle.openInputStream(); > return inputStream != null ? inputStream : > remoteHandle.openInputStream(); > } >. > } > {code} > Solution: > There are two kind solutions I can think of. > solution1: > Backend do the cached job, and the HybridStreamHandle point to both > local and remote data, HybridStreamHandle is managed by CheckpointCoordinator > as well as other StreamHandle, so CheckpointCoordinator will perform dispose > on it. when HybridStreamHandle performs dispose it call localHandle.dispose() > and remoteHandle.dispose(). In this way, we have to record TaskManager's info > (like location) in localHandle and add an entry in TaskManager to handle > localHandle dispose message, we also have to consider the HA situation. > solution2: > TaskManager do the cached job and manage the cached data itself. It > simple use a TTL-like method to manage handle's dispose, we dispose a handle > if it wasn't be touched for a X time. We will touch the handles when we > recover from checkpoint or when we performs a checkpoint, once we touch a > handle we reset the TTL for it. In this way, all jobs is done by Backend, it > transparent to JobManager. The only problem is that we may dispose a handle > that maybe useful, but even in this case, we can read from remote data > finally, and users can avoid this by set a proper TTL value according to > checkpoint interval and other things. > Consider trying not to complicate the problem reasons, i prefer to use the > solution2. Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object
[ https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226117#comment-16226117 ] Hai Zhou UTC+8 commented on FLINK-7947: --- +1 > Let ParameterTool return a dedicated GlobalJobParameters object > --- > > Key: FLINK-7947 > URL: https://issues.apache.org/jira/browse/FLINK-7947 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Bowen Li > > The {{ParameterTool}} directly implements the {{GlobalJobParameters}} > interface. Additionally it has grown over time to not only store the > configuration parameters but also to record which parameters have been > requested and what default value was set. This information is irrelevant on > the server side when setting a {{GlobalJobParameters}} object via > {{ExecutionConfig#setGlobalJobParameters}}. > Since we don't separate the {{ParameterTool}} logic and the actual data view, > users ran into problems when reusing the same {{ParameterTool}} to start > multiple jobs concurrently (see FLINK-7943). I think it would be a much > clearer separation of concerns if we would actually split the > {{GlobalJobParameters}} from the {{ParameterTool}}. > Furthermore, we should think about whether {{ParameterTool#get}} should have > side effects or not as it does right now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object
[ https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7947: --- Assignee: Bowen Li > Let ParameterTool return a dedicated GlobalJobParameters object > --- > > Key: FLINK-7947 > URL: https://issues.apache.org/jira/browse/FLINK-7947 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Bowen Li > > The {{ParameterTool}} directly implements the {{GlobalJobParameters}} > interface. Additionally it has grown over time to not only store the > configuration parameters but also to record which parameters have been > requested and what default value was set. This information is irrelevant on > the server side when setting a {{GlobalJobParameters}} object via > {{ExecutionConfig#setGlobalJobParameters}}. > Since we don't separate the {{ParameterTool}} logic and the actual data view, > users ran into problems when reusing the same {{ParameterTool}} to start > multiple jobs concurrently (see FLINK-7943). I think it would be a much > clearer separation of concerns if we would actually split the > {{GlobalJobParameters}} from the {{ParameterTool}}. > Furthermore, we should think about whether {{ParameterTool#get}} should have > side effects or not as it does right now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225946#comment-16225946 ] ASF GitHub Bot commented on FLINK-7940: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4918 > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.4.0 > > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4918 ---
[jira] [Closed] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7940. Resolution: Done Fix Version/s: 1.4.0 Added via c568aed1ecb7675a2776e15f120e45ad576eeb37 > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.4.0 > > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-6495. -- Resolution: Fixed Fixed via ae50c30ac3a7ce62df62120eda85b273a6aea7f7 > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225933#comment-16225933 ] ASF GitHub Bot commented on FLINK-7739: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4751 > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225934#comment-16225934 ] ASF GitHub Bot commented on FLINK-7739: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4749 > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225932#comment-16225932 ] ASF GitHub Bot commented on FLINK-6495: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4774 > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4749: [FLINK-7739][tests] Properly shutdown resources in...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4749 ---
[GitHub] flink pull request #4774: [FLINK-6495] Fix Akka's default value for heartbea...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4774 ---
[jira] [Comment Edited] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225927#comment-16225927 ] Till Rohrmann edited comment on FLINK-7739 at 10/30/17 11:06 PM: - Fixed via 152f6c9aff44c62744d2294b220664efc14acec9 and 9eb878e99021815bec6c033c6d78e16058e7b6a6 was (Author: till.rohrmann): Fixed via 152f6c9aff44c62744d2294b220664efc14acec9 > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4751: [FLINK-7739][kafka-tests] Throttle down data produ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4751 ---
[jira] [Resolved] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7739. -- Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 152f6c9aff44c62744d2294b220664efc14acec9 > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225922#comment-16225922 ] ASF GitHub Bot commented on FLINK-7951: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4926 [FLINK-7951] Load YarnConfiguration with default Hadoop configuration ## What is the purpose of the change This PR loads the default Hadoop configuration via HadoopUtils. getHadoopConfiguration and initializes the YarnConfiguration with it. R @aljoscha ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink loadHdfsConfiguration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4926 commit fd69067783cee840ae0f95af6addba451ca3449b Author: Till Rohrmann Date: 2017-10-30T23:01:05Z [FLINK-7951] Load YarnConfiguration with default Hadoop configuration This PR loads the default Hadoop configuration via HadoopUtils. getHadoopConfiguration and initializes the YarnConfiguration with it. > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4926: [FLINK-7951] Load YarnConfiguration with default H...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4926 [FLINK-7951] Load YarnConfiguration with default Hadoop configuration ## What is the purpose of the change This PR loads the default Hadoop configuration via HadoopUtils. getHadoopConfiguration and initializes the YarnConfiguration with it. R @aljoscha ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink loadHdfsConfiguration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4926 commit fd69067783cee840ae0f95af6addba451ca3449b Author: Till Rohrmann Date: 2017-10-30T23:01:05Z [FLINK-7951] Load YarnConfiguration with default Hadoop configuration This PR loads the default Hadoop configuration via HadoopUtils. getHadoopConfiguration and initializes the YarnConfiguration with it. ---
[jira] [Created] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
Till Rohrmann created FLINK-7951: Summary: YarnApplicationMaster does not load HDFSConfiguration Key: FLINK-7951 URL: https://issues.apache.org/jira/browse/FLINK-7951 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.4.0 Reporter: Till Rohrmann Priority: Critical Fix For: 1.4.0 When instantiating the {{YarnConfiguration}} we do not load the corresponding {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7951: Assignee: Till Rohrmann > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7936) Lack of synchronization w.r.t. taskManagers in MetricStore#add()
[ https://issues.apache.org/jira/browse/FLINK-7936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7936: -- Component/s: Metrics > Lack of synchronization w.r.t. taskManagers in MetricStore#add() > > > Key: FLINK-7936 > URL: https://issues.apache.org/jira/browse/FLINK-7936 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Ted Yu >Priority: Minor > > {code} > String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) > info).taskManagerID; > tm = taskManagers.computeIfAbsent(tmID, k -> new > TaskManagerMetricStore()); > {code} > In other places, access to taskManagers is protected by lock on > MetricStore.this -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7777) Bump japicmp to 0.10.0
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-: -- Component/s: Build System > Bump japicmp to 0.10.0 > -- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.2 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Minor > Fix For: 1.5.0 > > > Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting > these warnings from the maven plugin during a *mvn clean verify*: > {code:java} > [INFO] Written file '.../target/japicmp/japicmp.diff'. > [INFO] Written file '.../target/japicmp/japicmp.xml'. > [INFO] Written file '.../target/japicmp/japicmp.html'. > Warning: org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property > 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not > recognized. > Compiler warnings: > WARNING: 'org.apache.xerces.jaxp.SAXParserImpl: Property > 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.' > Warning: org.apache.xerces.parsers.SAXParser: Feature > 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized. > Warning: org.apache.xerces.parsers.SAXParser: Property > 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized. > Warning: org.apache.xerces.parsers.SAXParser: Property > 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not > recognized. > {code} > japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting > dependency in order to prevent warnings from SAXParserImpl. _ > The current stable version is 0.10.0, we can consider upgrading to this > version. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7871) SlotPool should release its unused slot to RM
[ https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7871: -- Component/s: Distributed Coordination > SlotPool should release its unused slot to RM > - > > Key: FLINK-7871 > URL: https://issues.apache.org/jira/browse/FLINK-7871 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: shuai.xu >Assignee: shuai.xu > > As described in design wiki > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, > _*The SlotPool releases slots that are unused to the ResourceManager. Slots > count as unused if they are not used when the job is fully running (fully > recovered).*_ > but now, the slot pool will keep the slots once offered to it until the job > finished. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7795: -- Component/s: Build System > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7782: -- Component/s: CEP > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern, ?> cep1 = > > Pattern.>begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7913: -- Component/s: Kafka Connector > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7935: -- Component/s: Metrics > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty
[ https://issues.apache.org/jira/browse/FLINK-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7927: -- Component/s: Build System > Different Netty Versions in dependencies of flink-runtime make it impossible > to use 3rd party libraries using netty > --- > > Key: FLINK-7927 > URL: https://issues.apache.org/jira/browse/FLINK-7927 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.2 > Environment: * Windows 10 x64 > * Java 1.8 >Reporter: Claudius Eisele > > I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink > streaming job but I am receiving the following error when executing it so > unfortunately it's not possible to use PubSub in a Flink Streaming Job: > {code:java} > ... > 10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING > 10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED > java.lang.IllegalStateException: Expected the service InnerService [FAILED] > to be RUNNING, but the service has FAILED > at > com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328) > at > com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266) > at > com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97) > > Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been > properly configured. > at > io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159) > at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136) > at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124) > at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457) > at > io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326) > at > io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315) > at > com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131) > at > com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116) > at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246) > at > com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149) > at > com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211) > at > com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121) > at > com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235) > ... 7 more > {code} > I reported this problem to the Google Cloud Java Library but the problem > seems more to be in Flink or its dependencies like akka because there are a > lot of netty dependencies with different versions in it: > * Apache Zookeeper (flink-runtime dependency) has \--- > io.netty:netty:3.7.0.Final -> 3.8.0.Final > * Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final > * Flink-Runtime has io.netty:netty-all:4.0.27.Final > In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1 > Additional information on the issue in combination with Google Cloud PubSub > can be found here: > https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398 > https://github.com/grpc/grpc-java/issues/3025 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release
[ https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7877: -- Component/s: Build System > Fix compilation against the Hadoop 3 beta1 release > -- > > Key: FLINK-7877 > URL: https://issues.apache.org/jira/browse/FLINK-7877 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu > Labels: build > > When compiling against hadoop 3.0.0-beta1, I got: > {code} > [ERROR] > /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16] > org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does > not override abstract method > setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in > org.apache.hadoop.yarn.api.records.Container > {code} > There may other hadoop API(s) that need adjustment. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7787) Remove guava dependency in the cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7787: -- Component/s: Build System > Remove guava dependency in the cassandra connector > -- > > Key: FLINK-7787 > URL: https://issues.apache.org/jira/browse/FLINK-7787 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > > As discovered in FLINK-6225, the cassandra connector uses the future classes > in the guava library. We can get rid of the dependency by using the > equivalent classes provided by Java 8. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha
[ https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7881: -- Component/s: YARN > flink can't deployed on yarn with ha > > > Key: FLINK-7881 > URL: https://issues.apache.org/jira/browse/FLINK-7881 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.2 >Reporter: deng >Priority: Blocker > > I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It > always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is > hdfs://master. > I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work. > Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10 > 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client > - IPC Client (1035144464) connection to > startdt/173.16.5.215:8020 from admin: closed > 2017-10-20 11:00:05,398 ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - YARN > Application Master initialization failed > java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 > failed on connection exception: java.net.ConnectException: Connection > refused; For more details see: > http://wiki.apache.org/hadoop/ConnectionRefused > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792) > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7934) Upgrade Calcite dependency to 1.15
[ https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7934: -- Component/s: Table API & SQL > Upgrade Calcite dependency to 1.15 > -- > > Key: FLINK-7934 > URL: https://issues.apache.org/jira/browse/FLINK-7934 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong > > Umbrella issue for all related issues for Apache Calcite 1.15 release. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7894) Improve metrics around fine-grained recovery and associated checkpointing behaviors
[ https://issues.apache.org/jira/browse/FLINK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-7894: -- Component/s: Metrics > Improve metrics around fine-grained recovery and associated checkpointing > behaviors > --- > > Key: FLINK-7894 > URL: https://issues.apache.org/jira/browse/FLINK-7894 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu > > Currently, the only metric around fine-grained recovery is "task_failures". > It's a very high level metric, it would be nice to have the following > improvements: > * Allows slice and dice into which tasks were restarted. > * Recovery duration. > * Recovery associated checkpoint behaviors: cancels, failures, etc -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225408#comment-16225408 ] Nico Kruber commented on FLINK-4228: [~aljoscha]: I successfully reproduced the error with a current snapshot of Flink 1.4 on EMR Steps to reproduce: # setup the S3A filesystem as described in https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html, i.e. #* adapt {{/etc/hadoop/conf/core-site.xml}} #* copy S3A filesystem dependency jars to {{flink/lib}} # adapt default filesystem for hadoop in {{/etc/hadoop/conf/core-site.xml}}, e.g.: {code} fs.defaultFS s3a://nico-test/ {code} # distribute new configuration to all nodes & restart to apply # try to run an example (here with two nodes) so that Flink tries to deploy the artefacts to YARN's (default) filesystem {code} > cd flink > HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 2 -ys 1 > -yjm 768 -ytm 1024 ./examples/batch/WordCount.jar Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2017-10-30 17:34:51,137 WARN org.apache.hadoop.conf.Configuration - /etc/hadoop/conf/core-site.xml:an attempt to override final parameter: fs.s3.buffer.dir; Ignoring. 2017-10-30 17:34:51,224 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2017-10-30 17:34:51,224 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2017-10-30 17:34:51,279 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at ip-172-31-22-149.eu-west-1.compute.internal/172.31.22.149:8032 2017-10-30 17:34:51,572 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=1} 2017-10-30 17:34:53,253 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/hadoop/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2017-10-30 17:34:53,268 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/flink/conf/log4j.properties to s3a://nico-test/user/hadoop/.flink/application_1509384765476_0001/log4j.properties 2017-10-30 17:34:53,824 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/flink/lib to s3a://nico-test/user/hadoop/.flink/application_1509384765476_0001/lib The program finished with the following exception: java.lang.RuntimeException: Error deploying the YARN cluster at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81) at org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:925) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:264) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroup
[jira] [Commented] (FLINK-7950) flink-queryable-state-runtime not a dependency in flink-dist
[ https://issues.apache.org/jira/browse/FLINK-7950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225364#comment-16225364 ] ASF GitHub Bot commented on FLINK-7950: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4925 [FLINK-7950][build] add flink-queryable-state-runtime as a dependency to flink-dist ## What is the purpose of the change Since #4906, `flink-queryable-state-runtime`'s jar file was put into the `opt/` folder of `flink-dist` and is thus required to build as well. ## Brief change log - add `flink-queryable-state-runtime` as a (provided) dependency to `flink-dist` ## Verifying this change This change can be verified by building the `flink-dist` sub-project: `mvn install -pl flink-dist -am` and verifying that it builds as well as that `org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler` is not part of the `flink-dist` uber jar. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (not to Flink itself) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7950 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4925 > flink-queryable-state-runtime not a dependency in flink-dist > > > Key: FLINK-7950 > URL: https://issues.apache.org/jira/browse/FLINK-7950 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{flink-dist}} requires the {{flink-queryable-state-runtime}} module to be > build since FLINK-7824 but this is not set as a dependency in the {{pom.xml}} > and thus fails building the sub-project alone, e.g. via {{mvn install -pl > flink-dist -am}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4925: [FLINK-7950][build] add flink-queryable-state-runt...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4925 [FLINK-7950][build] add flink-queryable-state-runtime as a dependency to flink-dist ## What is the purpose of the change Since #4906, `flink-queryable-state-runtime`'s jar file was put into the `opt/` folder of `flink-dist` and is thus required to build as well. ## Brief change log - add `flink-queryable-state-runtime` as a (provided) dependency to `flink-dist` ## Verifying this change This change can be verified by building the `flink-dist` sub-project: `mvn install -pl flink-dist -am` and verifying that it builds as well as that `org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler` is not part of the `flink-dist` uber jar. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (not to Flink itself) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7950 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4925 ---
[jira] [Created] (FLINK-7950) flink-queryable-state-runtime not a dependency in flink-dist
Nico Kruber created FLINK-7950: -- Summary: flink-queryable-state-runtime not a dependency in flink-dist Key: FLINK-7950 URL: https://issues.apache.org/jira/browse/FLINK-7950 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.4.0 Reporter: Nico Kruber Assignee: Nico Kruber Priority: Blocker {{flink-dist}} requires the {{flink-queryable-state-runtime}} module to be build since FLINK-7824 but this is not set as a dependency in the {{pom.xml}} and thus fails building the sub-project alone, e.g. via {{mvn install -pl flink-dist -am}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-4228: Description: The issue now is exclusive to running on YARN with s3a:// as your configured FileSystem. If so, the Flink session will fail on staging itself because it tries to copy the flink/lib directory to S3 and the S3aFileSystem does not support recursive copy. h2. Old Issue Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) leads to an Exception when uploading the snapshot to S3 when using the {{S3AFileSystem}}. {code} AsynchronousException{com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 (Is a directory)} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 (Is a directory) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 (Is a directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) ... 9 more {code} Running with S3NFileSystem, the error does not occur. The problem might be due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created automatically. We might need to manually create folders and copy only actual files for {{S3AFileSystem}}. More investigation is required. was: Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) leads to an Exception when uploading the snapshot to S3 when using the {{S3AFileSystem}}. {code} AsynchronousException{com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 (Is a directory)} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 (Is a directory) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) C
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225241#comment-16225241 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147754468 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java --- @@ -61,7 +61,7 @@ * IT cases for the {@link FlinkKafkaProducer011}. */ @SuppressWarnings("serial") -public class FlinkKafkaProducer011Tests extends KafkaTestBase { +public class FlinkKafkaProducer011Test extends KafkaTestBase { --- End diff -- Shouldn't this be named `*ITCase` according to the coding conventions ? > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147754468 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java --- @@ -61,7 +61,7 @@ * IT cases for the {@link FlinkKafkaProducer011}. */ @SuppressWarnings("serial") -public class FlinkKafkaProducer011Tests extends KafkaTestBase { +public class FlinkKafkaProducer011Test extends KafkaTestBase { --- End diff -- Shouldn't this be named `*ITCase` according to the coding conventions ? ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225238#comment-16225238 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147754127 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- How are you testing now that `enqueueRequest` is called? > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147754127 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- How are you testing now that `enqueueRequest` is called? ---
[jira] [Commented] (FLINK-6495) Migrate Akka configuration options
[ https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225173#comment-16225173 ] ASF GitHub Bot commented on FLINK-6495: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4774#discussion_r147743853 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala --- @@ -257,6 +257,20 @@ object AkkaUtils { ConfigFactory.parseString(config) } + private def validateHeartbeat(pauseParamName: String, +pauseValue: String, +intervalParamName: String, +intervalValue: String) = { +if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) { + throw new IllegalConfigurationException( +"%s [%s] must greater then %s [%s]", --- End diff -- this should actually be "than", not "then" > Migrate Akka configuration options > -- > > Key: FLINK-6495 > URL: https://issues.apache.org/jira/browse/FLINK-6495 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Reporter: Chesnay Schepler >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4774: [FLINK-6495] Fix Akka's default value for heartbea...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4774#discussion_r147743853 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala --- @@ -257,6 +257,20 @@ object AkkaUtils { ConfigFactory.parseString(config) } + private def validateHeartbeat(pauseParamName: String, +pauseValue: String, +intervalParamName: String, +intervalValue: String) = { +if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) { + throw new IllegalConfigurationException( +"%s [%s] must greater then %s [%s]", --- End diff -- this should actually be "than", not "then" ---
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225160#comment-16225160 ] ASF GitHub Bot commented on FLINK-7784: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147741136 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -35,60 +42,101 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + @Mock + private Clock mockClock; + + @Mock + private Logger mockLogger; --- End diff -- The test does not rely on any mocks anymore. I am not 100% happy with it because we use log4j `1.x` which is not maintained anymore and in log4j `2.x`, the APIs have changed a lot: http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147741136 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -35,60 +42,101 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + @Mock + private Clock mockClock; + + @Mock + private Logger mockLogger; --- End diff -- The test does not rely on any mocks anymore. I am not 100% happy with it because we use log4j `1.x` which is not maintained anymore and in log4j `2.x`, the APIs have changed a lot: http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent ---
[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson
[ https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225151#comment-16225151 ] ASF GitHub Bot commented on FLINK-7418: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4923 Ok for me. > Replace all uses of jackson with flink-shaded-jackson > - > > Key: FLINK-7418 > URL: https://issues.apache.org/jira/browse/FLINK-7418 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.4.0 > > > Jackson is currently used to create JSON responses in the web UI, in the > future possibly for the client REST communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225146#comment-16225146 ] ASF GitHub Bot commented on FLINK-7784: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147739898 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; --- End diff -- Ok, I like `ignoreFailuresAfterTransactionTimeout`. The method name is now the same, though. > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4923 Ok for me. ---
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147739898 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; --- End diff -- Ok, I like `ignoreFailuresAfterTransactionTimeout`. The method name is now the same, though. ---
[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson
[ https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225139#comment-16225139 ] ASF GitHub Bot commented on FLINK-7418: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4923 It's more of an assumption tbh, i haven't tried it out. I've looked through the fenzo source code to search for APIs that expose jackson but couldn't find any. They annotate a number of POJOs but don't even map them to/from JSON outside of tests. > Replace all uses of jackson with flink-shaded-jackson > - > > Key: FLINK-7418 > URL: https://issues.apache.org/jira/browse/FLINK-7418 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.4.0 > > > Jackson is currently used to create JSON responses in the web UI, in the > future possibly for the client REST communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4923 It's more of an assumption tbh, i haven't tried it out. I've looked through the fenzo source code to search for APIs that expose jackson but couldn't find any. They annotate a number of POJOs but don't even map them to/from JSON outside of tests. ---
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147736898 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; --- End diff -- Ok, I like `ignoreFailuresAfterTransactionTimeout` ---
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225119#comment-16225119 ] ASF GitHub Bot commented on FLINK-7784: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147736898 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; --- End diff -- Ok, I like `ignoreFailuresAfterTransactionTimeout` > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225114#comment-16225114 ] ASF GitHub Bot commented on FLINK-7784: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147735664 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java --- @@ -83,49 +79,6 @@ public void before() { extraProperties.put("isolation.level", "read_committed"); } - @Test(timeout = 3L) --- End diff -- The tests that I removed were already in `FlinkKafkaProducerTests`. Probably some copy paste error. > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147735664 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java --- @@ -83,49 +79,6 @@ public void before() { extraProperties.put("isolation.level", "read_committed"); } - @Test(timeout = 3L) --- End diff -- The tests that I removed were already in `FlinkKafkaProducerTests`. Probably some copy paste error. ---
[jira] [Closed] (FLINK-6916) FLIP-19: Improved BLOB storage architecture
[ https://issues.apache.org/jira/browse/FLINK-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-6916. -- Resolution: Done Fix Version/s: 1.4.0 > FLIP-19: Improved BLOB storage architecture > --- > > Key: FLINK-6916 > URL: https://issues.apache.org/jira/browse/FLINK-6916 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.4.0 > > > The current architecture around the BLOB server and cache components seems > rather patched up and has some issues regarding concurrency ([FLINK-6380]), > cleanup, API inconsistencies / currently unused API ([FLINK-6329], > [FLINK-6008]). These make future integration with FLIP-6 or extensions like > offloading oversized RPC messages ([FLINK-6046]) difficult. We therefore > propose an improvement on the current architecture as described below which > tackles these issues, provides some cleanup, and enables further BLOB server > use cases. > Please refer to > https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture > for a full overview on the proposed changes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225085#comment-16225085 ] ASF GitHub Bot commented on FLINK-7949: --- GitHub user bartektartanus opened a pull request: https://github.com/apache/flink/pull/4924 [FLINK-7949] AsyncWaitOperator is not restarting when queue is full Change: Emitter thread is started BEFORE filling up the queue of recovered elements Issue description: During process restart, if the queue was full (with N elements) and there was pending element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html I've changed the order of this code to start emitter thread earlier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bartektartanus/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4924 commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c Author: Bartłomiej Tartanus Date: 2017-10-30T14:39:43Z start emmiter thread BEFORE filling up the queue of recovered elements > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...
GitHub user bartektartanus opened a pull request: https://github.com/apache/flink/pull/4924 [FLINK-7949] AsyncWaitOperator is not restarting when queue is full Change: Emitter thread is started BEFORE filling up the queue of recovered elements Issue description: During process restart, if the queue was full (with N elements) and there was pending element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html I've changed the order of this code to start emitter thread earlier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bartektartanus/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4924 commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c Author: BartÅomiej Tartanus Date: 2017-10-30T14:39:43Z start emmiter thread BEFORE filling up the queue of recovered elements ---
[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson
[ https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225082#comment-16225082 ] ASF GitHub Bot commented on FLINK-7418: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4923 Changes look good. :+1: How do we know that our Mesos support still works when Jackson is shaded there? > Replace all uses of jackson with flink-shaded-jackson > - > > Key: FLINK-7418 > URL: https://issues.apache.org/jira/browse/FLINK-7418 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.4.0 > > > Jackson is currently used to create JSON responses in the web UI, in the > future possibly for the client REST communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4923 Changes look good. :+1: How do we know that our Mesos support still works when Jackson is shaded there? ---
[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments
[ https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225080#comment-16225080 ] ASF GitHub Bot commented on FLINK-7400: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4506 I just tested this on a real yarn cluster with `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 -yD taskmanager.memory.off-heap=true -yD taskmanager.memory.size=260 -yD taskmanager.memory.preallocate=true ./examples/batch/WordCount.jar` (the equivalent of the test) and verified that it was failing on Flink 1.3.2 but working with the proposed fix. I also tested the counterpart without the three memory options, i.e. `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 ./examples/batch/WordCount.jar` and verified it is working with the PR changes > off-heap limits set to conservatively in cluster environments > - > > Key: FLINK-7400 > URL: https://issues.apache.org/jira/browse/FLINK-7400 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Mesos, YARN >Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the > {{offHeapSize}} is set to the amount of memory Flink will use off-heap which > will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. > This does not account for any off-heap use by other components than Flink, > e.g. RocksDB, other libraries, or the JVM itself. > We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} > configuration parameter to {{offHeapSize}} as implied by the description on > what this parameter is there for. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4506 I just tested this on a real yarn cluster with `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 -yD taskmanager.memory.off-heap=true -yD taskmanager.memory.size=260 -yD taskmanager.memory.preallocate=true ./examples/batch/WordCount.jar` (the equivalent of the test) and verified that it was failing on Flink 1.3.2 but working with the proposed fix. I also tested the counterpart without the three memory options, i.e. `./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 ./examples/batch/WordCount.jar` and verified it is working with the PR changes ---
[jira] [Created] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
Bartłomiej Tartanus created FLINK-7949: -- Summary: AsyncWaitOperator is not restarting when queue is full Key: FLINK-7949 URL: https://issues.apache.org/jira/browse/FLINK-7949 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.2 Reporter: Bartłomiej Tartanus Issue was describe here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html Issue - AsyncWaitOperator can't restart properly after failure (thread is waiting forever) Scenario to reproduce this issue: 1. The queue is full (let's assume that its capacity is N elements) 2. There is some pending element waiting, so the pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and while-loop in addAsyncBufferEntry method is trying to add this element to the queue (but element is not added because queue is full) 3. Now the snapshot is taken - the whole queue of N elements is being written into the ListState in snapshotState method and also (what is more important) this pendingStreamElementQueueEntry is written to this list too. 4. The process is being restarted, so it tries to recover all the elements and put them again into the queue, but the list of recovered elements hold N+1 element and our queue capacity is only N. Process is not started yet, so it can not process any element and this one element is waiting endlessly. But it's never added and the process will never process anything. Deadlock. 5. Trigger is fired and indeed discarded because the process is not running yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225068#comment-16225068 ] ASF GitHub Bot commented on FLINK-7784: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147727170 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; + + /** +* If a transaction's elapsed time reaches this percentage of the transactionTimeout, a warning +* message will be logged. Value must be in range [0,1]. Negative value disables warnings. +*/ + private double transactionTimeoutWarningRatio = -1; --- End diff -- I tend to agree with @pnowojski about the API surface but I think in this case this is a valid safety net for possible future transaction sinks. > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147727170 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional userContext; protected ListState> state; + private final ListStateDescriptor> stateDescriptor; + + private TransactionHolder currentTransaction; + + /** +* Specifies the maximum time a transaction should remain open. +*/ + private long transactionTimeout = Long.MAX_VALUE; + + /** +* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of +* propagated. +*/ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; + + /** +* If a transaction's elapsed time reaches this percentage of the transactionTimeout, a warning +* message will be logged. Value must be in range [0,1]. Negative value disables warnings. +*/ + private double transactionTimeoutWarningRatio = -1; --- End diff -- I tend to agree with @pnowojski about the API surface but I think in this case this is a valid safety net for possible future transaction sinks. ---
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225064#comment-16225064 ] ASF GitHub Bot commented on FLINK-7784: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147654876 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java --- @@ -83,49 +79,6 @@ public void before() { extraProperties.put("isolation.level", "read_committed"); } - @Test(timeout = 3L) --- End diff -- Why are these removed? Did they never actually test anything? > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147654876 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java --- @@ -83,49 +79,6 @@ public void before() { extraProperties.put("isolation.level", "read_committed"); } - @Test(timeout = 3L) --- End diff -- Why are these removed? Did they never actually test anything? ---
[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson
[ https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225054#comment-16225054 ] ASF GitHub Bot commented on FLINK-7418: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4923 [FLINK-7418][build] Integrate flink-shaded-jackson2 ## What is the purpose of the change This PR replaces all usages of com.fasterxml.jackson with flink-shaded-jackson. One change of note is that flink-mesos now shades com.netflix.fenzo:fenzo-core so that it can use its own jackson version. ## Brief change log * replace all jackson dependencies with flink-shaded-jackson * add used undeclared dependencies to many modules * replace all usages of com.fasterxml.jackson with org.apache.flink.shaded.jackson2.com.fasterxml.jackson * modify mesos shade-plugin configuration to shade fenzo-core * modify quickstarts to exclude flink-shaded-jackson instead * (implicit) bump jackson version to 2.7.9 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7418 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4923 commit bac19a90544fba63693983970dece1a4fee5f14a Author: zentol Date: 2017-10-16T10:48:12Z [FLINK-7418][build] Integrate flink-shaded-jackson2 commit 97dcac37a783a746d1d6c77099d6a241febb22e6 Author: zentol Date: 2017-10-16T10:49:38Z kafka commit ee8ae4d312193f288e7b671f13efd69a827dca58 Author: zentol Date: 2017-10-16T10:51:11Z gelly commit fa1dd1e9a0bb6af4ce2d2a4a7c6bcd502dbfaa99 Author: zentol Date: 2017-10-16T10:51:46Z streaming examples commit 39fbd1a66f94bff3d01231a1d9028ca5e534186a Author: zentol Date: 2017-10-16T10:52:23Z DataDog reporter commit 6725ef65ce0e0c8e45c5002ea47d66f9e3e5aa28 Author: zentol Date: 2017-10-16T10:53:37Z runtime-web commit 7d661a849d3c7eca0dda676492bf852ab598d2f0 Author: zentol Date: 2017-10-16T10:54:30Z optimizer commit 87f2dc8e9fcad3c69ce166e853cac3a3c4b89039 Author: zentol Date: 2017-10-16T10:56:09Z flink-tests commit 81af03d9560b1da043b4a211ece8092df2b17c61 Author: zentol Date: 2017-10-16T10:57:08Z mesos commit 6fa19385f343675737fc0d0c8e06366167662af5 Author: zentol Date: 2017-10-16T10:59:14Z quickstarts commit 5d27f2cade24a37b6d22427d01cf2d3274f62feb Author: zentol Date: 2017-10-16T10:59:43Z table commit 992fb2ebfbb100a69d22e100b6ec217672f14b76 Author: zentol Date: 2017-10-16T11:01:06Z streaming-java commit f3c645c8fa415181cea42bc8039897516ddd26a4 Author: zentol Date: 2017-10-16T11:01:55Z yarn-tests commit 90abe30d7dc762e64231ef47ddd1ed40d0d80cb5 Author: zentol Date: 2017-10-16T11:03:43Z runtime > Replace all uses of jackson with flink-shaded-jackson > - > > Key: FLINK-7418 > URL: https://issues.apache.org/jira/browse/FLINK-7418 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.4.0 > > > Jackson is currently used to create JSON responses in the web UI, in the > future possibly for the client REST communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4923: [FLINK-7418][build] Integrate flink-shaded-jackson...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4923 [FLINK-7418][build] Integrate flink-shaded-jackson2 ## What is the purpose of the change This PR replaces all usages of com.fasterxml.jackson with flink-shaded-jackson. One change of note is that flink-mesos now shades com.netflix.fenzo:fenzo-core so that it can use its own jackson version. ## Brief change log * replace all jackson dependencies with flink-shaded-jackson * add used undeclared dependencies to many modules * replace all usages of com.fasterxml.jackson with org.apache.flink.shaded.jackson2.com.fasterxml.jackson * modify mesos shade-plugin configuration to shade fenzo-core * modify quickstarts to exclude flink-shaded-jackson instead * (implicit) bump jackson version to 2.7.9 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7418 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4923 commit bac19a90544fba63693983970dece1a4fee5f14a Author: zentol Date: 2017-10-16T10:48:12Z [FLINK-7418][build] Integrate flink-shaded-jackson2 commit 97dcac37a783a746d1d6c77099d6a241febb22e6 Author: zentol Date: 2017-10-16T10:49:38Z kafka commit ee8ae4d312193f288e7b671f13efd69a827dca58 Author: zentol Date: 2017-10-16T10:51:11Z gelly commit fa1dd1e9a0bb6af4ce2d2a4a7c6bcd502dbfaa99 Author: zentol Date: 2017-10-16T10:51:46Z streaming examples commit 39fbd1a66f94bff3d01231a1d9028ca5e534186a Author: zentol Date: 2017-10-16T10:52:23Z DataDog reporter commit 6725ef65ce0e0c8e45c5002ea47d66f9e3e5aa28 Author: zentol Date: 2017-10-16T10:53:37Z runtime-web commit 7d661a849d3c7eca0dda676492bf852ab598d2f0 Author: zentol Date: 2017-10-16T10:54:30Z optimizer commit 87f2dc8e9fcad3c69ce166e853cac3a3c4b89039 Author: zentol Date: 2017-10-16T10:56:09Z flink-tests commit 81af03d9560b1da043b4a211ece8092df2b17c61 Author: zentol Date: 2017-10-16T10:57:08Z mesos commit 6fa19385f343675737fc0d0c8e06366167662af5 Author: zentol Date: 2017-10-16T10:59:14Z quickstarts commit 5d27f2cade24a37b6d22427d01cf2d3274f62feb Author: zentol Date: 2017-10-16T10:59:43Z table commit 992fb2ebfbb100a69d22e100b6ec217672f14b76 Author: zentol Date: 2017-10-16T11:01:06Z streaming-java commit f3c645c8fa415181cea42bc8039897516ddd26a4 Author: zentol Date: 2017-10-16T11:01:55Z yarn-tests commit 90abe30d7dc762e64231ef47ddd1ed40d0d80cb5 Author: zentol Date: 2017-10-16T11:03:43Z runtime ---
[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object
[ https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225051#comment-16225051 ] Aljoscha Krettek commented on FLINK-7947: - +1 I think most code around {{ParemeterTool}} would benefit from a proper design and refactoring. > Let ParameterTool return a dedicated GlobalJobParameters object > --- > > Key: FLINK-7947 > URL: https://issues.apache.org/jira/browse/FLINK-7947 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.4.0 >Reporter: Till Rohrmann > > The {{ParameterTool}} directly implements the {{GlobalJobParameters}} > interface. Additionally it has grown over time to not only store the > configuration parameters but also to record which parameters have been > requested and what default value was set. This information is irrelevant on > the server side when setting a {{GlobalJobParameters}} object via > {{ExecutionConfig#setGlobalJobParameters}}. > Since we don't separate the {{ParameterTool}} logic and the actual data view, > users ran into problems when reusing the same {{ParameterTool}} to start > multiple jobs concurrently (see FLINK-7943). I think it would be a much > clearer separation of concerns if we would actually split the > {{GlobalJobParameters}} from the {{ParameterTool}}. > Furthermore, we should think about whether {{ParameterTool#get}} should have > side effects or not as it does right now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4922: [hotfix][metrics] Cleanup ScopeFormats
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4922#discussion_r147719227 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java --- @@ -38,30 +36,9 @@ // /** -* Creates all default scope formats. -*/ - public ScopeFormats() { --- End diff -- Can't the class be `final`? Either way it has a private constructor. ---
[GitHub] flink issue #4794: [build][minor] Add missing licenses
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4794 Adding the license to `browserconfig.xml` looks fine, but why change the user configurations `masters`, `slaves`, and `zoo.cfg`? Are these even copyrightable? ---
[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
[ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225030#comment-16225030 ] ASF GitHub Bot commented on FLINK-7784: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147717560 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -35,60 +42,101 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + @Mock + private Clock mockClock; + + @Mock + private Logger mockLogger; --- End diff -- True, you are right. One would need to implement the `org.slf4j.Logger` interface. - http://projects.lidalia.org.uk/slf4j-test/ provides a `org.slf4j.Logger` implementation for unit testing but this approach comes with other problems (e.g. https://github.com/Mahoney/slf4j-test/issues/15, [NOP if log level is disabled](https://github.com/Mahoney/slf4j-test/blob/master/src/main/java/uk/org/lidalia/slf4jtest/TestLogger.java#L449) Right now I don't see a way to do it properly. Any help is welcome. > Don't fail TwoPhaseCommitSinkFunction when failing to commit > > > Key: FLINK-7784 > URL: https://issues.apache.org/jira/browse/FLINK-7784 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > > Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails > (either when doing it via the completed checkpoint notification or when > trying to commit after restoring after failure). This means that the job will > go into an infinite recovery loop because we will always keep failing. > In some cases it might be better to ignore those failures and keep on > processing and this should be the default. We can provide an option that > allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147717560 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -35,60 +42,101 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link TwoPhaseCommitSinkFunction}. */ public class TwoPhaseCommitSinkFunctionTest { - TestContext context; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private FileBasedSinkFunction sinkFunction; + + private OneInputStreamOperatorTestHarness harness; + + private AtomicBoolean throwException = new AtomicBoolean(); + + private File targetDirectory; + + private File tmpDirectory; + + @Mock + private Clock mockClock; + + @Mock + private Logger mockLogger; --- End diff -- True, you are right. One would need to implement the `org.slf4j.Logger` interface. - http://projects.lidalia.org.uk/slf4j-test/ provides a `org.slf4j.Logger` implementation for unit testing but this approach comes with other problems (e.g. https://github.com/Mahoney/slf4j-test/issues/15, [NOP if log level is disabled](https://github.com/Mahoney/slf4j-test/blob/master/src/main/java/uk/org/lidalia/slf4jtest/TestLogger.java#L449) Right now I don't see a way to do it properly. Any help is welcome. ---
[GitHub] flink pull request #4922: [hotfix][metrics] Cleanup ScopeFormats
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4922 [hotfix][metrics] Cleanup ScopeFormats ## What is the purpose of the change This PR cleans up a few things related to `ScopeFormats`. ## Brief change log * remove unused constructor * remove redundant `MetricRegistryConfiguration#createScopeConfig` * remove unnecessary default constructor (passing an empty configuration instead is easier to maintain) * limit visibility of constructor ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink metrics_hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4922.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4922 commit 3211152f9b0d357c40fef6f924ca9170ba34d3a5 Author: zentol Date: 2017-10-30T14:07:52Z [hotfix][metrics] Remove MetricRegistryConfiguration#createScopeConfig This method duplicates ScopeFormats#fromConfig commit 5fcd4d64fba1aaa1e093dc70bcf19087604debc4 Author: zentol Date: 2017-10-30T14:09:42Z [hotfix][metrics] Remove unused ScopeFormats constructor commit 5be7e4e648d2426cd59ebeb0872d03d1202642e8 Author: zentol Date: 2017-10-30T14:11:39Z [hotfix][metrics] Remove unnecessary ScopeFormats constructor commit bbd3b8f5c955a46cfa72d13136110f3ec959f459 Author: zentol Date: 2017-10-30T14:12:00Z [hotfix][metrics] Limit visibility of ScopeFormats constructor ---
[jira] [Assigned] (FLINK-7732) Test instability in Kafka end-to-end test (invalid Kafka offset)
[ https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-7732: - Assignee: Piotr Nowojski > Test instability in Kafka end-to-end test (invalid Kafka offset) > > > Key: FLINK-7732 > URL: https://issues.apache.org/jira/browse/FLINK-7732 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes in the network stack, the Kafa > end-to-end test was failing with an invalid offset: > {code} > 2017-09-28 06:34:10,736 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2017-09-28 06:34:10,744 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2017-09-28 06:34:14,549 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,573 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,686 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:14,687 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) dead for group myconsumer > 2017-09-28 06:34:14,792 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: > 2147483647 rack: null) for group myconsumer. > 2017-09-28 06:34:15,068 INFO > org.apache.flink.runtime.state.DefaultOperatorStateBackend- > DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous > part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: > Unnamed (1/1),5,Flink Task Threads] took 948 ms. > 2017-09-28 06:34:15,164 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > 2017-09-28 06:34:15,171 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async > Kafka commit failed. > java.lang.IllegalArgumentException: Invalid offset: -915623761772 > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217) > {code} > https://travis-ci.org/apache/flink/jobs/280722829 > [~pnowojski] did a first analysis that revealed this: > In > org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 > this is being sent: > {{long offsetToCommit = lastProcessedOffset + 1;}} > {{lastProcessedOffset}} comes from: > {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#snapshotState}} > either lines 741 or 749 > The value that we see is strangely similiar to > {{org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel#GROUP_OFFSET}} > {code} > /** > * Magic number that defines the partition
[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225008#comment-16225008 ] ASF GitHub Bot commented on FLINK-7902: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147711564 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List pendingCommitTransactions) { public void setContext(Optional context) { this.context = context; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + State state = (State) o; + --- End diff -- I used IntelliJ to generate those. :sweat_smile: > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType, > GenericType)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147711564 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List pendingCommitTransactions) { public void setContext(Optional context) { this.context = context; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + State state = (State) o; + --- End diff -- I used IntelliJ to generate those. :sweat_smile: ---
[jira] [Updated] (FLINK-7948) YarnApplicationMaster should also load hdfs-site.xml
[ https://issues.apache.org/jira/browse/FLINK-7948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7948: - Summary: YarnApplicationMaster should also load hdfs-site.xml (was: YarnApplicationMaster should also load the hdfs-site.xml) > YarnApplicationMaster should also load hdfs-site.xml > > > Key: FLINK-7948 > URL: https://issues.apache.org/jira/browse/FLINK-7948 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > The {{YarnApplicationMaster}} uses the {{YarnConfiguration}} to load the Yarn > configuration. This class automatically loads the {{core-site.xml}} and the > {{yarn-site.xml}} from the class path. It, however, does not load the > {{hdfs-site.xml}}. > I propose to also trying to load the {{hdfs-site.xml}} and add it to the > {{YarnConfiguration}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224984#comment-16224984 ] ASF GitHub Bot commented on FLINK-7902: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147704985 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List pendingCommitTransactions) { public void setContext(Optional context) { this.context = context; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + State state = (State) o; + --- End diff -- Not a deal breaker but these if's are a bit too complicated. Using `Objects.equals()` on each of the components and `&&` would simplify them. > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType, > GenericType)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147704985 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List pendingCommitTransactions) { public void setContext(Optional context) { this.context = context; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + State state = (State) o; + --- End diff -- Not a deal breaker but these if's are a bit too complicated. Using `Objects.equals()` on each of the components and `&&` would simplify them. ---
[jira] [Created] (FLINK-7948) YarnApplicationMaster should also load the hdfs-site.xml
Till Rohrmann created FLINK-7948: Summary: YarnApplicationMaster should also load the hdfs-site.xml Key: FLINK-7948 URL: https://issues.apache.org/jira/browse/FLINK-7948 Project: Flink Issue Type: Improvement Components: YARN Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Critical Fix For: 1.4.0 The {{YarnApplicationMaster}} uses the {{YarnConfiguration}} to load the Yarn configuration. This class automatically loads the {{core-site.xml}} and the {{yarn-site.xml}} from the class path. It, however, does not load the {{hdfs-site.xml}}. I propose to also trying to load the {{hdfs-site.xml}} and add it to the {{YarnConfiguration}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224978#comment-16224978 ] ASF GitHub Bot commented on FLINK-7765: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 I'm not sure if you can easily disable convergence in child pom. I think that shading happens after resolving dependency versions. So first you would need to fix dependency convergence for given module to ensure that during compilation everything will be ok/converged. Shading happens after that and I think it can only fix/avoid potential errors in other modules, that depend on a module that shaded something. > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4777 I'm not sure if you can easily disable convergence in child pom. I think that shading happens after resolving dependency versions. So first you would need to fix dependency convergence for given module to ensure that during compilation everything will be ok/converged. Shading happens after that and I think it can only fix/avoid potential errors in other modules, that depend on a module that shaded something. ---
[jira] [Closed] (FLINK-7696) Add projection push-down support for TableSources with time attributes
[ https://issues.apache.org/jira/browse/FLINK-7696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7696. Resolution: Implemented Fix Version/s: 1.4.0 Implemented for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e > Add projection push-down support for TableSources with time attributes > -- > > Key: FLINK-7696 > URL: https://issues.apache.org/jira/browse/FLINK-7696 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > Fix For: 1.4.0 > > > Table sources that implement the {{DefinedProctimeAttribute}} or > {{DefinedRowtimeAttribute}} do not support projection push-down even if they > also implement {{ProjectableTableSource}}. > There are several problems: > - the schema of a {{TableSource}} that implements {{DefinedRowtimeAttribute}} > or {{DefinedProctimeAttribute}} is constructed in the catalog not in the > {{TableSource}} (proctime fields are always appended at the end). > - the {{ProjectableTableSource.projectFields()}} method returns the projected > fields as int indicies. In order to handle the indicies correctly, the > TableSource would need to know about the internals of the Table API. > - {{ProjectableTableSource.projectFields()}} might reorder fields and move a > proctime field into the middle of the schema. However, the TableSource has no > control over that. > - A {{TableSource}} that implements {{DefinedRowtimeAttribute}} or > {{DefinedProctimeAttribute}} would need to change the return values of > {{getRowtimeAttribute()}} or {{getProctimeAttribute()}} depending on whether > the attribute is kept or not. > Adjusting the schema of table sources inside of the Table API makes all of > this quite messy. Maybe we need to refine the interfaces. For instance, we > could ask users to explicitly add time indicator fields in the > {{TypeInformation}} returned by {{TableSource.getReturnType()}}. However, > that might collide with plans to add computable time attributes as proposed > in FLINK-7548. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6870) Combined batch and stream TableSource can not produce same time attributes
[ https://issues.apache.org/jira/browse/FLINK-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6870. Resolution: Fixed Fix Version/s: 1.4.0 Fixed for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e > Combined batch and stream TableSource can not produce same time attributes > -- > > Key: FLINK-6870 > URL: https://issues.apache.org/jira/browse/FLINK-6870 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther > Fix For: 1.4.0 > > > If a class implements both {{BatchTableSource}} and {{StreamTableSource}}, it > is not possible to declare a time attribute which is valid for both > environments. For batch it should be a regular field, but not for streaming. > The {{getReturnType}} method does not know the environment in which it is > called. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7179) ProjectableTableSource interface doesn't compatible with BoundedOutOfOrdernessTimestampExtractor
[ https://issues.apache.org/jira/browse/FLINK-7179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7179. Resolution: Fixed Fix Version/s: 1.4.0 Fixed for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e > ProjectableTableSource interface doesn't compatible with > BoundedOutOfOrdernessTimestampExtractor > > > Key: FLINK-7179 > URL: https://issues.apache.org/jira/browse/FLINK-7179 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang > Fix For: 1.4.0 > > > In the implementation of window of stream sql, > BoundedOutOfOrdernessTimestampExtractor is designed to extract row time from > each row. It assumes the ts field is in the data stream by default. On the > other hand, ProjectableTableSource is designed to help projection push down. > If there is no row time related field in a query, the extractor can't > function well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7548. Resolution: Implemented Implemented for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object
[ https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7947: - Description: The {{ParameterTool}} directly implements the {{GlobalJobParameters}} interface. Additionally it has grown over time to not only store the configuration parameters but also to record which parameters have been requested and what default value was set. This information is irrelevant on the server side when setting a {{GlobalJobParameters}} object via {{ExecutionConfig#setGlobalJobParameters}}. Since we don't separate the {{ParameterTool}} logic and the actual data view, users ran into problems when reusing the same {{ParameterTool}} to start multiple jobs concurrently (see FLINK-7943). I think it would be a much clearer separation of concerns if we would actually split the {{GlobalJobParameters}} from the {{ParameterTool}}. Furthermore, we should think about whether {{ParameterTool#get}} should have side effects or not as it does right now. was: The {{ParameterTool}} directly implements the {{GlobalJobParameters}} interface. Additionally it has grown over time to not only store the configuration parameters but also to record which parameters have been requested and what default value was set. This information is irrelevant on the server side when setting a {{GlobalJobParameters}} object via {{ExecutionConfig#setGlobalJobParameters}}. Since we don't separate the {{ParameterTool}} logic and the actual data view, users ran into problems when reusing the same {{ParameterTool}} to start multiple jobs concurrently (see FLINK-7943). I think it would be a much clearer separation of concerns if we would actually split the {{GlobalJobParameters}} from the {{ParameterTool}}. > Let ParameterTool return a dedicated GlobalJobParameters object > --- > > Key: FLINK-7947 > URL: https://issues.apache.org/jira/browse/FLINK-7947 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.4.0 >Reporter: Till Rohrmann > > The {{ParameterTool}} directly implements the {{GlobalJobParameters}} > interface. Additionally it has grown over time to not only store the > configuration parameters but also to record which parameters have been > requested and what default value was set. This information is irrelevant on > the server side when setting a {{GlobalJobParameters}} object via > {{ExecutionConfig#setGlobalJobParameters}}. > Since we don't separate the {{ParameterTool}} logic and the actual data view, > users ran into problems when reusing the same {{ParameterTool}} to start > multiple jobs concurrently (see FLINK-7943). I think it would be a much > clearer separation of concerns if we would actually split the > {{GlobalJobParameters}} from the {{ParameterTool}}. > Furthermore, we should think about whether {{ParameterTool#get}} should have > side effects or not as it does right now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)