[jira] [Updated] (FLINK-7697) Add metrics for Elasticsearch Sink

2017-10-30 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7697:
--
Fix Version/s: 1.5.0

> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Wish
>  Components: ElasticSearch Connector
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Critical
> Fix For: 1.5.0
>
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7881) flink can't deployed on yarn with ha

2017-10-30 Thread deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226328#comment-16226328
 ] 

deng commented on FLINK-7881:
-

I have  dig out why the issue happened.

As fllow  code the failoverProxyProvider is null , so it think  it's a No-HA 
case. The YarnConfiguration didn't read hdfs-site.xml to get 
"dfs.client.failover.proxy.provider.startdt" value,only yarn-site.xml and 
core-site.xml be read.

!screenshot-1.png!
!screenshot-2.png!

I have found a solution as below:

In org.apache.flink.yarnYarnApplicationMasterRunner.java:
replace "final YarnConfiguration yarnConfig = new YarnConfiguration();" 
with "final YarnConfiguration yarnConfig = new 
YarnConfiguration(HadoopUtils.getHadoopConfiguration());"

   In org.apache.flink.runtime.util.HadoopUtils.java:
add the below code in getHadoopConfiguration()
 if (new File(possibleHadoopConfPath + "/yarn-site.xml").exists()) {
  retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath 
+ "/yarn-site.xml"));

  if (LOG.isDebugEnabled()) {
   LOG.debug("Adding " + possibleHadoopConfPath + "/yarn-site.xml to hadoop 
configuration");
  }
 }



> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha

2017-10-30 Thread deng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

deng updated FLINK-7881:

Attachment: screenshot-2.png

> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha

2017-10-30 Thread deng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

deng updated FLINK-7881:

Attachment: screenshot-1.png

> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7936) Lack of synchronization w.r.t. taskManagers in MetricStore#add()

2017-10-30 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226280#comment-16226280
 ] 

Bowen Li commented on FLINK-7936:
-

Hi [~pnowojski], does this still hold after your refactor of {{MetricStore}}?

> Lack of synchronization w.r.t. taskManagers in MetricStore#add()
> 
>
> Key: FLINK-7936
> URL: https://issues.apache.org/jira/browse/FLINK-7936
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) 
> info).taskManagerID;
>   tm = taskManagers.computeIfAbsent(tmID, k -> new 
> TaskManagerMetricStore());
> {code}
> In other places, access to taskManagers is protected by lock on 
> MetricStore.this



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226215#comment-16226215
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user summerleafs commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r147891494
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
//  Miscellaneous
// 

 
+   /**
+* Calculates the preferred locations based on the location preference 
constraint.
+*
+* @param locationPreferenceConstraint constraint for the location 
preference
+* @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
+*  have been a resource assigned.
+*/
+   @VisibleForTesting
+   public CompletableFuture> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   final Collection> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+   final CompletableFuture> 
preferredLocationsFuture;
--- End diff --

Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't 
yet support reading the checkpoint data locally? I have create a issue for 
flink reading checkpoint locally 
[here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it 
complete i wonder if we can invoke `getPreferedLocations()` instead of 
`getPreferredLocationsBasedOnInputs()`.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-10-30 Thread summerleafs
Github user summerleafs commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r147891494
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
//  Miscellaneous
// 

 
+   /**
+* Calculates the preferred locations based on the location preference 
constraint.
+*
+* @param locationPreferenceConstraint constraint for the location 
preference
+* @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
+*  have been a resource assigned.
+*/
+   @VisibleForTesting
+   public CompletableFuture> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   final Collection> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+   final CompletableFuture> 
preferredLocationsFuture;
--- End diff --

Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't 
yet support reading the checkpoint data locally? I have create a issue for 
flink reading checkpoint locally 
[here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it 
complete i wonder if we can invoke `getPreferedLocations()` instead of 
`getPreferredLocationsBasedOnInputs()`.


---


[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226198#comment-16226198
 ] 

ASF GitHub Bot commented on FLINK-7153:
---

Github user summerleafs commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r147889602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

Hi, allocate resources according to the order of topologically, is just to 
facilitate the optimization of 'allocate resource base on prefer input'? it may 
cause bad result if we allocate base on state.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> ---
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.1
>Reporter: Sihua Zhou
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

2017-10-30 Thread summerleafs
Github user summerleafs commented on a diff in the pull request:

https://github.com/apache/flink/pull/4916#discussion_r147889602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
// that way we do not have any operation that can fail between 
allocating the slots
// and adding them to the list. If we had a failure in between 
there, that would
// cause the slots to get lost
-   final ArrayList resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
final boolean queued = allowQueuedScheduling;
 
-   // we use this flag to handle failures in a 'finally' clause
-   // that allows us to not go through clumsy cast-and-rethrow 
logic
-   boolean successful = false;
+   // collecting all the slots may resize and fail in that 
operation without slots getting lost
+   final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-   try {
-   // collecting all the slots may resize and fail in that 
operation without slots getting lost
-   final ArrayList> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+   // allocate the slots (obtain all their futures
+   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+   // these calls are not blocking, they only return 
futures
--- End diff --

Hi, allocate resources according to the order of topologically, is just to 
facilitate the optimization of 'allocate resource base on prefer input'? it may 
cause bad result if we allocate base on state.


---


[jira] [Updated] (FLINK-7588) Document RocksDB tuning for spinning disks

2017-10-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7588:
--
Description: 
In docs/ops/state/large_state_tuning.md , it was mentioned that:

bq. the default configuration is tailored towards SSDs and performs suboptimal 
on spinning disks

We should add recommendation targeting spinning disks:
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk

  was:
In docs/ops/state/large_state_tuning.md , it was mentioned that:
bq. the default configuration is tailored towards SSDs and performs suboptimal 
on spinning disks

We should add recommendation targeting spinning disks:
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk


> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-10-30 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
--
Description: 
Why i introduce this:
Current recover strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
(e.g. 1T). What's worse, if this job performs recover again and again, it can 
eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that 
we can cache the checkpoint data locally, and read checkpoint data from local 
cache as well as we can, we read the data from remote only if we fail locally. 
The advantage is that if a execution is assigned to the same TaskManager as 
before, it can save a lot of bandwith, and obtain a faster recover.

Solution:
TaskManager do the cache job and manage the cached data itself. It simple 
use a TTL-like method to manage cache entry's dispose, we dispose a entry if it 
wasn't be touched for a X time, once we touch a entry we reset the TTL for it. 
In this way, all jobs is done by TaskManager, it transparent to JobManager. The 
only problem is that we may dispose a entry that maybe useful, in this case, we 
have to read from remote data finally, but users can avoid this by set a proper 
TTL value according to checkpoint interval and other things.

Can someone give me some advice? I would appreciate it very much~

  was:
Current recover strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
(e.g. 1T). What's worse, if this job performs recover again and again, it can 
eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that 
we can cache the checkpoint data locally, and read checkpoint data from local 
cache as well as we can, we read the data from remote only if we fail locally. 
The advantage is that if a execution is assigned to the same TaskManager as 
before, it can save a lot of bandwidth, and obtain a faster recover.


Key problems:
1. Cache the checkpoint data on local disk and manage it's create and delete.
2. introduce a HybridStreamStateHandler which try to create a local input 
stream first, if failed, it then create a remote input stream, it prototype 
looks like below:
{code:java}
class HybridStreamHandle {
   private StreamStateHandle localHandle;
   private StreamStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
   .
}
{code}

Solution:
There are two kind solutions I can think of.

solution1:
Backend do the cached job, and the HybridStreamHandle point to both 
local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
as well as other StreamHandle, so CheckpointCoordinator will perform dispose on 
it. when HybridStreamHandle performs dispose it call localHandle.dispose() and 
remoteHandle.dispose(). In this way, we have to record TaskManager's info (like 
location) in localHandle and add an entry in TaskManager to handle localHandle 
dispose message, we also have to consider the HA situation.

solution2:
TaskManager do the cached job and manage the cached data itself. It 
simple use a TTL-like method to manage handle's dispose, we dispose a handle if 
it wasn't be touched for a X time. We will touch the handles when we recover 
from checkpoint or when we performs a checkpoint, once we touch a handle we 
reset the TTL for it. In this way, all jobs is done by Backend, it transparent 
to JobManager. The only problem is that we may dispose a handle that maybe 
useful, but even in this case, we can read from remote data finally, and users 
can avoid this by set a proper TTL value according to checkpoint interval and 
other things.

Consider trying not to complicate the problem reasons, i prefer to use the 
solution2. Can someone give me some advice? I would appreciate it very much~


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to clu

[jira] [Updated] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-10-30 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
--
Summary: Introduce CheckpointCacheManager for reading checkpoint data 
locally when performing failover  (was: Introduce HybridStreamHandle to 
optimize the recovery mechanism and try to read the checkpoint data locally)

> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwidth, and obtain a faster 
> recover.
> Key problems:
> 1. Cache the checkpoint data on local disk and manage it's create and delete.
> 2. introduce a HybridStreamStateHandler which try to create a local input 
> stream first, if failed, it then create a remote input stream, it prototype 
> looks like below:
> {code:java}
> class HybridStreamHandle {
>private StreamStateHandle localHandle;
>private StreamStateHandle remoteHandle;
>..
>public FSDataInputStream openInputStream() throws IOException {
> FSDataInputStream inputStream = localHandle.openInputStream();
> return inputStream != null ? inputStream : 
> remoteHandle.openInputStream();
> }
>.
> }
> {code}
> Solution:
>   There are two kind solutions I can think of.
> solution1:
>   Backend do the cached job, and the HybridStreamHandle point to both 
> local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
> as well as other StreamHandle, so CheckpointCoordinator will perform dispose 
> on it. when HybridStreamHandle performs dispose it call localHandle.dispose() 
> and remoteHandle.dispose(). In this way, we have to record TaskManager's info 
> (like location) in localHandle and add an entry in TaskManager to handle 
> localHandle dispose message, we also have to consider the HA situation.
> solution2:
>   TaskManager do the cached job and manage the cached data itself. It 
> simple use a TTL-like method to manage handle's dispose, we dispose a handle 
> if it wasn't be touched for a X time. We will touch the handles when we 
> recover from checkpoint or when we performs a checkpoint, once we touch a 
> handle we reset the TTL for it. In this way, all jobs is done by Backend, it 
> transparent to JobManager. The only problem is that we may dispose a handle 
> that maybe useful, but even in this case, we can read from remote data 
> finally, and users can avoid this by set a proper TTL value according to 
> checkpoint interval and other things.
> Consider trying not to complicate the problem reasons, i prefer to use the 
> solution2. Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object

2017-10-30 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226117#comment-16226117
 ] 

Hai Zhou UTC+8 commented on FLINK-7947:
---

+1

> Let ParameterTool return a dedicated GlobalJobParameters object
> ---
>
> Key: FLINK-7947
> URL: https://issues.apache.org/jira/browse/FLINK-7947
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Bowen Li
>
> The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
> interface. Additionally it has grown over time to not only store the 
> configuration parameters but also to record which parameters have been 
> requested and what default value was set. This information is irrelevant on 
> the server side when setting a {{GlobalJobParameters}} object via 
> {{ExecutionConfig#setGlobalJobParameters}}.
> Since we don't separate the {{ParameterTool}} logic and the actual data view, 
> users ran into problems when reusing the same {{ParameterTool}} to start 
> multiple jobs concurrently (see FLINK-7943). I think it would be a much 
> clearer separation of concerns if we would actually split the 
> {{GlobalJobParameters}} from the {{ParameterTool}}.
> Furthermore, we should think about whether {{ParameterTool#get}} should have 
> side effects or not as it does right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object

2017-10-30 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7947:
---

Assignee: Bowen Li

> Let ParameterTool return a dedicated GlobalJobParameters object
> ---
>
> Key: FLINK-7947
> URL: https://issues.apache.org/jira/browse/FLINK-7947
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Bowen Li
>
> The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
> interface. Additionally it has grown over time to not only store the 
> configuration parameters but also to record which parameters have been 
> requested and what default value was set. This information is irrelevant on 
> the server side when setting a {{GlobalJobParameters}} object via 
> {{ExecutionConfig#setGlobalJobParameters}}.
> Since we don't separate the {{ParameterTool}} logic and the actual data view, 
> users ran into problems when reusing the same {{ParameterTool}} to start 
> multiple jobs concurrently (see FLINK-7943). I think it would be a much 
> clearer separation of concerns if we would actually split the 
> {{GlobalJobParameters}} from the {{ParameterTool}}.
> Furthermore, we should think about whether {{ParameterTool#get}} should have 
> side effects or not as it does right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7940) Add timeout for futures

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225946#comment-16225946
 ] 

ASF GitHub Bot commented on FLINK-7940:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4918


> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.4.0
>
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4918


---


[jira] [Closed] (FLINK-7940) Add timeout for futures

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7940.

   Resolution: Done
Fix Version/s: 1.4.0

Added via c568aed1ecb7675a2776e15f120e45ad576eeb37

> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.4.0
>
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6495) Migrate Akka configuration options

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-6495.
--
Resolution: Fixed

Fixed via ae50c30ac3a7ce62df62120eda85b273a6aea7f7

> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225933#comment-16225933
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4751


> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225934#comment-16225934
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4749


> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6495) Migrate Akka configuration options

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225932#comment-16225932
 ] 

ASF GitHub Bot commented on FLINK-6495:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4774


> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4749: [FLINK-7739][tests] Properly shutdown resources in...

2017-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4749


---


[GitHub] flink pull request #4774: [FLINK-6495] Fix Akka's default value for heartbea...

2017-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4774


---


[jira] [Comment Edited] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-30 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225927#comment-16225927
 ] 

Till Rohrmann edited comment on FLINK-7739 at 10/30/17 11:06 PM:
-

Fixed via 152f6c9aff44c62744d2294b220664efc14acec9 and 
9eb878e99021815bec6c033c6d78e16058e7b6a6


was (Author: till.rohrmann):
Fixed via 152f6c9aff44c62744d2294b220664efc14acec9

> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4751: [FLINK-7739][kafka-tests] Throttle down data produ...

2017-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4751


---


[jira] [Resolved] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7739.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 152f6c9aff44c62744d2294b220664efc14acec9

> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225922#comment-16225922
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4926

[FLINK-7951] Load YarnConfiguration with default Hadoop configuration

## What is the purpose of the change

This PR loads the default Hadoop configuration via HadoopUtils.
getHadoopConfiguration and initializes the YarnConfiguration with
it.

R @aljoscha 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink loadHdfsConfiguration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4926


commit fd69067783cee840ae0f95af6addba451ca3449b
Author: Till Rohrmann 
Date:   2017-10-30T23:01:05Z

[FLINK-7951] Load YarnConfiguration with default Hadoop configuration

This PR loads the default Hadoop configuration via HadoopUtils.
getHadoopConfiguration and initializes the YarnConfiguration with
it.




> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4926: [FLINK-7951] Load YarnConfiguration with default H...

2017-10-30 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4926

[FLINK-7951] Load YarnConfiguration with default Hadoop configuration

## What is the purpose of the change

This PR loads the default Hadoop configuration via HadoopUtils.
getHadoopConfiguration and initializes the YarnConfiguration with
it.

R @aljoscha 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink loadHdfsConfiguration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4926


commit fd69067783cee840ae0f95af6addba451ca3449b
Author: Till Rohrmann 
Date:   2017-10-30T23:01:05Z

[FLINK-7951] Load YarnConfiguration with default Hadoop configuration

This PR loads the default Hadoop configuration via HadoopUtils.
getHadoopConfiguration and initializes the YarnConfiguration with
it.




---


[jira] [Created] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-10-30 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7951:


 Summary: YarnApplicationMaster does not load HDFSConfiguration
 Key: FLINK-7951
 URL: https://issues.apache.org/jira/browse/FLINK-7951
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Priority: Critical
 Fix For: 1.4.0


When instantiating the {{YarnConfiguration}} we do not load the corresponding 
{{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7951:


Assignee: Till Rohrmann

> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7936) Lack of synchronization w.r.t. taskManagers in MetricStore#add()

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7936:
--
Component/s: Metrics

> Lack of synchronization w.r.t. taskManagers in MetricStore#add()
> 
>
> Key: FLINK-7936
> URL: https://issues.apache.org/jira/browse/FLINK-7936
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) 
> info).taskManagerID;
>   tm = taskManagers.computeIfAbsent(tmID, k -> new 
> TaskManagerMetricStore());
> {code}
> In other places, access to taskManagers is protected by lock on 
> MetricStore.this



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7777) Bump japicmp to 0.10.0

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-:
--
Component/s: Build System

> Bump japicmp to 0.10.0
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting 
> these warnings from the maven plugin during a *mvn clean verify*:
> {code:java}
> [INFO] Written file '.../target/japicmp/japicmp.diff'.
> [INFO] Written file '.../target/japicmp/japicmp.xml'.
> [INFO] Written file '.../target/japicmp/japicmp.html'.
> Warning:  org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> Compiler warnings:
>   WARNING:  'org.apache.xerces.jaxp.SAXParserImpl: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.'
> Warning:  org.apache.xerces.parsers.SAXParser: Feature 
> 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> {code}
> japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
> dependency in order to prevent warnings from SAXParserImpl. _
> The current stable version is 0.10.0, we can consider upgrading to this 
> version.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7871) SlotPool should release its unused slot to RM

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7871:
--
Component/s: Distributed Coordination

> SlotPool should release its unused slot to RM
> -
>
> Key: FLINK-7871
> URL: https://issues.apache.org/jira/browse/FLINK-7871
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> As described in design wiki 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, 
> _*The SlotPool releases slots that are unused to the ResourceManager. Slots 
> count as unused if they are not used when the job is fully running (fully 
> recovered).*_
> but now, the slot pool will keep the slots once offered to it until the job 
> finished.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7795:
--
Component/s: Build System

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7782:
--
Component/s: CEP

> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7913) Add support for Kafka default partitioner

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7913:
--
Component/s: Kafka Connector

> Add support for Kafka default partitioner
> -
>
> Key: FLINK-7913
> URL: https://issues.apache.org/jira/browse/FLINK-7913
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Konstantin Lalafaryan
>Assignee: Konstantin Lalafaryan
> Fix For: 1.5.0
>
>
> Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* 
> and just one implementation *FlinkFixedPartitioner*. 
> In order to be able to use Kafka's default partitioner you have to create new 
> implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. 
> It will be really good to be able to define the partitioner without 
> implementing the new class.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7935) Metrics with user supplied scope variables

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7935:
--
Component/s: Metrics

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7927:
--
Component/s: Build System

> Different Netty Versions in dependencies of flink-runtime make it impossible 
> to use 3rd party libraries using netty
> ---
>
> Key: FLINK-7927
> URL: https://issues.apache.org/jira/browse/FLINK-7927
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: * Windows 10 x64
> * Java 1.8
>Reporter: Claudius Eisele
>
> I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink 
> streaming job but I am receiving the following error when executing it so 
> unfortunately it's not possible to use PubSub in a Flink Streaming Job:
> {code:java}
> ...
> 10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING
> 10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED
> java.lang.IllegalStateException: Expected the service InnerService [FAILED] 
> to be RUNNING, but the service has FAILED
> at 
> com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328)
> at 
> com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
> at 
> com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
> 
> Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been 
> properly configured.
> at 
> io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159)
> at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136)
> at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124)
> at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457)
> at 
> io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326)
> at 
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315)
> at 
> com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131)
> at 
> com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116)
> at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246)
> at 
> com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149)
> at 
> com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211)
> at 
> com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121)
> at 
> com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235)
> ... 7 more
> {code}
> I reported this problem to the Google Cloud Java Library but the problem 
> seems more to be in Flink or its dependencies like akka because there are a 
> lot of netty dependencies with different versions in it:
> * Apache Zookeeper (flink-runtime dependency) has \--- 
> io.netty:netty:3.7.0.Final -> 3.8.0.Final
> * Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final
> * Flink-Runtime has io.netty:netty-all:4.0.27.Final
> In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1
> Additional information on the issue in combination with Google Cloud PubSub 
> can be found here:
> https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398
> https://github.com/grpc/grpc-java/issues/3025



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7877:
--
Component/s: Build System

> Fix compilation against the Hadoop 3 beta1 release
> --
>
> Key: FLINK-7877
> URL: https://issues.apache.org/jira/browse/FLINK-7877
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>  Labels: build
>
> When compiling against hadoop 3.0.0-beta1, I got:
> {code}
> [ERROR] 
> /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16]
>  org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does 
> not override abstract method 
> setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in 
> org.apache.hadoop.yarn.api.records.Container
> {code}
> There may other hadoop API(s) that need adjustment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7787) Remove guava dependency in the cassandra connector

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7787:
--
Component/s: Build System

> Remove guava dependency in the cassandra connector
> --
>
> Key: FLINK-7787
> URL: https://issues.apache.org/jira/browse/FLINK-7787
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> As discovered in FLINK-6225, the cassandra connector uses the future classes 
> in the guava library. We can get rid of the dependency by using the 
> equivalent classes provided by Java 8.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7881) flink can't deployed on yarn with ha

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7881:
--
Component/s: YARN

> flink can't deployed on yarn with ha
> 
>
> Key: FLINK-7881
> URL: https://issues.apache.org/jira/browse/FLINK-7881
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2
>Reporter: deng
>Priority: Blocker
>
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7934) Upgrade Calcite dependency to 1.15

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7934:
--
Component/s: Table API & SQL

> Upgrade Calcite dependency to 1.15
> --
>
> Key: FLINK-7934
> URL: https://issues.apache.org/jira/browse/FLINK-7934
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>
> Umbrella issue for all related issues for Apache Calcite 1.15 release.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7894) Improve metrics around fine-grained recovery and associated checkpointing behaviors

2017-10-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7894:
--
Component/s: Metrics

> Improve metrics around fine-grained recovery and associated checkpointing 
> behaviors
> ---
>
> Key: FLINK-7894
> URL: https://issues.apache.org/jira/browse/FLINK-7894
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Zhenzhong Xu
>
> Currently, the only metric around fine-grained recovery is "task_failures". 
> It's a very high level metric, it would be nice to have the following 
> improvements:
> * Allows slice and dice into which tasks were restarted. 
> * Recovery duration.
> * Recovery associated checkpoint behaviors: cancels, failures, etc



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-10-30 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225408#comment-16225408
 ] 

Nico Kruber commented on FLINK-4228:


[~aljoscha]: I successfully reproduced the error with a current snapshot of 
Flink 1.4 on EMR

Steps to reproduce:
# setup the S3A filesystem as described in 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html,
 i.e.
#* adapt {{/etc/hadoop/conf/core-site.xml}}
#* copy S3A filesystem dependency jars to {{flink/lib}}
# adapt default filesystem for hadoop in {{/etc/hadoop/conf/core-site.xml}}, 
e.g.:
{code}
  
fs.defaultFS
s3a://nico-test/
  
{code}
# distribute new configuration to all nodes & restart to apply
# try to run an example (here with two nodes) so that Flink tries to deploy the 
artefacts to YARN's (default) filesystem
{code}
> cd flink
> HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 2 -ys 1 
> -yjm 768 -ytm 1024 ./examples/batch/WordCount.jar
Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/home/hadoop/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2017-10-30 17:34:51,137 WARN  org.apache.hadoop.conf.Configuration  
- /etc/hadoop/conf/core-site.xml:an attempt to override final 
parameter: fs.s3.buffer.dir;  Ignoring.
2017-10-30 17:34:51,224 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2017-10-30 17:34:51,224 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2017-10-30 17:34:51,279 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
ip-172-31-22-149.eu-west-1.compute.internal/172.31.22.149:8032
2017-10-30 17:34:51,572 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Cluster specification: ClusterSpecification{masterMemoryMB=768, 
taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=1}
2017-10-30 17:34:53,253 WARN  org.apache.flink.yarn.YarnClusterDescriptor   
- The configuration directory ('/home/hadoop/flink/conf') contains 
both LOG4J and Logback configuration files. Please delete or rename one of them.
2017-10-30 17:34:53,268 INFO  org.apache.flink.yarn.Utils   
- Copying from file:/home/hadoop/flink/conf/log4j.properties to 
s3a://nico-test/user/hadoop/.flink/application_1509384765476_0001/log4j.properties
2017-10-30 17:34:53,824 INFO  org.apache.flink.yarn.Utils   
- Copying from file:/home/hadoop/flink/lib to 
s3a://nico-test/user/hadoop/.flink/application_1509384765476_0001/lib


 The program finished with the following exception:

java.lang.RuntimeException: Error deploying the YARN cluster
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81)
at 
org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:925)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:264)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroup

[jira] [Commented] (FLINK-7950) flink-queryable-state-runtime not a dependency in flink-dist

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225364#comment-16225364
 ] 

ASF GitHub Bot commented on FLINK-7950:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4925

[FLINK-7950][build] add flink-queryable-state-runtime as a dependency to 
flink-dist

## What is the purpose of the change

Since #4906, `flink-queryable-state-runtime`'s jar file was put into the 
`opt/`
folder of `flink-dist` and is thus required to build as well.

## Brief change log

- add `flink-queryable-state-runtime` as a (provided) dependency to 
`flink-dist`

## Verifying this change

This change can be verified by building the `flink-dist` sub-project: `mvn 
install -pl flink-dist -am` and verifying that it builds as well as that 
`org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler` is not 
part of the `flink-dist` uber jar.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (not to Flink 
itself)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7950

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4925






> flink-queryable-state-runtime not a dependency in flink-dist
> 
>
> Key: FLINK-7950
> URL: https://issues.apache.org/jira/browse/FLINK-7950
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{flink-dist}} requires the {{flink-queryable-state-runtime}} module to be 
> build since FLINK-7824 but this is not set as a dependency in the {{pom.xml}} 
> and thus fails building the sub-project alone, e.g. via {{mvn install -pl 
> flink-dist -am}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4925: [FLINK-7950][build] add flink-queryable-state-runt...

2017-10-30 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4925

[FLINK-7950][build] add flink-queryable-state-runtime as a dependency to 
flink-dist

## What is the purpose of the change

Since #4906, `flink-queryable-state-runtime`'s jar file was put into the 
`opt/`
folder of `flink-dist` and is thus required to build as well.

## Brief change log

- add `flink-queryable-state-runtime` as a (provided) dependency to 
`flink-dist`

## Verifying this change

This change can be verified by building the `flink-dist` sub-project: `mvn 
install -pl flink-dist -am` and verifying that it builds as well as that 
`org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler` is not 
part of the `flink-dist` uber jar.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (not to Flink 
itself)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7950

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4925






---


[jira] [Created] (FLINK-7950) flink-queryable-state-runtime not a dependency in flink-dist

2017-10-30 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7950:
--

 Summary: flink-queryable-state-runtime not a dependency in 
flink-dist
 Key: FLINK-7950
 URL: https://issues.apache.org/jira/browse/FLINK-7950
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber
Priority: Blocker


{{flink-dist}} requires the {{flink-queryable-state-runtime}} module to be 
build since FLINK-7824 but this is not set as a dependency in the {{pom.xml}} 
and thus fails building the sub-project alone, e.g. via {{mvn install -pl 
flink-dist -am}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-10-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4228:

Description: 
The issue now is exclusive to running on YARN with s3a:// as your configured 
FileSystem. If so, the Flink session will fail on staging itself because it 
tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
support recursive copy.

h2. Old Issue
Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
leads to an Exception when uploading the snapshot to S3 when using the 
{{S3AFileSystem}}.

{code}
AsynchronousException{com.amazonaws.AmazonClientException: Unable to calculate 
MD5 hash: 
/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
 (Is a directory)}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
 (Is a directory)
at 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: 
/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
 (Is a directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
... 9 more
{code}

Running with S3NFileSystem, the error does not occur. The problem might be due 
to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
automatically. We might need to manually create folders and copy only actual 
files for {{S3AFileSystem}}. More investigation is required.

  was:
Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
leads to an Exception when uploading the snapshot to S3 when using the 
{{S3AFileSystem}}.

{code}
AsynchronousException{com.amazonaws.AmazonClientException: Unable to calculate 
MD5 hash: 
/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
 (Is a directory)}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
 (Is a directory)
at 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
C

[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225241#comment-16225241
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754468
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

Shouldn't this be named `*ITCase` according to the coding conventions ?


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754468
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

Shouldn't this be named `*ITCase` according to the coding conventions ?


---


[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225238#comment-16225238
 ] 

ASF GitHub Bot commented on FLINK-7838:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754127
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

How are you testing now that `enqueueRequest` is called? 


> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754127
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

How are you testing now that `enqueueRequest` is called? 


---


[jira] [Commented] (FLINK-6495) Migrate Akka configuration options

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225173#comment-16225173
 ] 

ASF GitHub Bot commented on FLINK-6495:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4774#discussion_r147743853
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---
@@ -257,6 +257,20 @@ object AkkaUtils {
 ConfigFactory.parseString(config)
   }
 
+  private def validateHeartbeat(pauseParamName: String,
+pauseValue: String,
+intervalParamName: String,
+intervalValue: String) = {
+if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
+  throw new IllegalConfigurationException(
+"%s [%s] must greater then %s [%s]",
--- End diff --

this should actually be "than", not "then"


> Migrate Akka configuration options
> --
>
> Key: FLINK-6495
> URL: https://issues.apache.org/jira/browse/FLINK-6495
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4774: [FLINK-6495] Fix Akka's default value for heartbea...

2017-10-30 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4774#discussion_r147743853
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---
@@ -257,6 +257,20 @@ object AkkaUtils {
 ConfigFactory.parseString(config)
   }
 
+  private def validateHeartbeat(pauseParamName: String,
+pauseValue: String,
+intervalParamName: String,
+intervalValue: String) = {
+if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
+  throw new IllegalConfigurationException(
+"%s [%s] must greater then %s [%s]",
--- End diff --

this should actually be "than", not "then"


---


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225160#comment-16225160
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147741136
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

The test does not rely on any mocks anymore. I am not 100% happy with it 
because we use log4j `1.x` which is not maintained anymore and in log4j `2.x`, 
the APIs have changed a lot: 
http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147741136
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

The test does not rely on any mocks anymore. I am not 100% happy with it 
because we use log4j `1.x` which is not maintained anymore and in log4j `2.x`, 
the APIs have changed a lot: 
http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent


---


[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225151#comment-16225151
 ] 

ASF GitHub Bot commented on FLINK-7418:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4923
  
Ok for me.


> Replace all uses of jackson with flink-shaded-jackson
> -
>
> Key: FLINK-7418
> URL: https://issues.apache.org/jira/browse/FLINK-7418
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Jackson is currently used to create JSON responses in the web UI, in the 
> future possibly for the client REST communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225146#comment-16225146
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147739898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`. The method name is now 
the same, though.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2

2017-10-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4923
  
Ok for me.


---


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147739898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`. The method name is now 
the same, though.


---


[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225139#comment-16225139
 ] 

ASF GitHub Bot commented on FLINK-7418:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4923
  
It's more of an assumption tbh, i haven't tried it out. I've looked through 
the fenzo source code to search for APIs that expose jackson but couldn't find 
any. They annotate a number of POJOs but don't even map them to/from JSON 
outside of tests.


> Replace all uses of jackson with flink-shaded-jackson
> -
>
> Key: FLINK-7418
> URL: https://issues.apache.org/jira/browse/FLINK-7418
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Jackson is currently used to create JSON responses in the web UI, in the 
> future possibly for the client REST communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2

2017-10-30 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4923
  
It's more of an assumption tbh, i haven't tried it out. I've looked through 
the fenzo source code to search for APIs that expose jackson but couldn't find 
any. They annotate a number of POJOs but don't even map them to/from JSON 
outside of tests.


---


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147736898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`


---


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225119#comment-16225119
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147736898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225114#comment-16225114
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147735664
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

The tests that I removed were already in `FlinkKafkaProducerTests`. 
Probably some copy paste error.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147735664
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

The tests that I removed were already in `FlinkKafkaProducerTests`. 
Probably some copy paste error.


---


[jira] [Closed] (FLINK-6916) FLIP-19: Improved BLOB storage architecture

2017-10-30 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-6916.
--
   Resolution: Done
Fix Version/s: 1.4.0

> FLIP-19: Improved BLOB storage architecture
> ---
>
> Key: FLINK-6916
> URL: https://issues.apache.org/jira/browse/FLINK-6916
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> The current architecture around the BLOB server and cache components seems 
> rather patched up and has some issues regarding concurrency ([FLINK-6380]), 
> cleanup, API inconsistencies / currently unused API ([FLINK-6329], 
> [FLINK-6008]). These make future integration with FLIP-6 or extensions like 
> offloading oversized RPC messages ([FLINK-6046]) difficult. We therefore 
> propose an improvement on the current architecture as described below which 
> tackles these issues, provides some cleanup, and enables further BLOB server 
> use cases.
> Please refer to 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>  for a full overview on the proposed changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225085#comment-16225085
 ] 

ASF GitHub Bot commented on FLINK-7949:
---

GitHub user bartektartanus opened a pull request:

https://github.com/apache/flink/pull/4924

[FLINK-7949] AsyncWaitOperator is not restarting when queue is full

Change:
Emitter thread is started BEFORE filling up the queue of recovered elements
Issue description:
During process restart, if the queue was full (with N elements) and there 
was pending element waiting to be added to the queue, then the queue couldn't 
fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested 
here:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
I've changed the order of this code to start emitter thread earlier.   


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bartektartanus/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4924


commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c
Author: Bartłomiej Tartanus 
Date:   2017-10-30T14:39:43Z

start emmiter thread BEFORE filling up the queue of recovered elements




> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...

2017-10-30 Thread bartektartanus
GitHub user bartektartanus opened a pull request:

https://github.com/apache/flink/pull/4924

[FLINK-7949] AsyncWaitOperator is not restarting when queue is full

Change:
Emitter thread is started BEFORE filling up the queue of recovered elements
Issue description:
During process restart, if the queue was full (with N elements) and there 
was pending element waiting to be added to the queue, then the queue couldn't 
fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested 
here:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
I've changed the order of this code to start emitter thread earlier.   


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bartektartanus/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4924


commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c
Author: Bartłomiej Tartanus 
Date:   2017-10-30T14:39:43Z

start emmiter thread BEFORE filling up the queue of recovered elements




---


[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225082#comment-16225082
 ] 

ASF GitHub Bot commented on FLINK-7418:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4923
  
Changes look good. :+1:

How do we know that our Mesos support still works when Jackson is shaded 
there?


> Replace all uses of jackson with flink-shaded-jackson
> -
>
> Key: FLINK-7418
> URL: https://issues.apache.org/jira/browse/FLINK-7418
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Jackson is currently used to create JSON responses in the web UI, in the 
> future possibly for the client REST communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4923: [FLINK-7418][build] Integrate flink-shaded-jackson2

2017-10-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4923
  
Changes look good. :+1:

How do we know that our Mesos support still works when Jackson is shaded 
there?


---


[jira] [Commented] (FLINK-7400) off-heap limits set to conservatively in cluster environments

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225080#comment-16225080
 ] 

ASF GitHub Bot commented on FLINK-7400:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4506
  
I just tested this on a real yarn cluster with `./bin/flink run -m 
yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 -yD 
taskmanager.memory.off-heap=true -yD taskmanager.memory.size=260 -yD 
taskmanager.memory.preallocate=true ./examples/batch/WordCount.jar` (the 
equivalent of the test) and verified that it was failing on Flink 1.3.2 but 
working with the proposed fix.

I also tested the counterpart without the three memory options, i.e. 
`./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 
./examples/batch/WordCount.jar` and verified it is working with the PR changes


> off-heap limits set to conservatively in cluster environments
> -
>
> Key: FLINK-7400
> URL: https://issues.apache.org/jira/browse/FLINK-7400
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Mesos, YARN
>Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> Inside {{ContaineredTaskManagerParameters}}, since FLINK-6217, the 
> {{offHeapSize}} is set to the amount of memory Flink will use off-heap which 
> will be set as the value for {{-XX:MaxDirectMemorySize}} in various cases. 
> This does not account for any off-heap use by other components than Flink, 
> e.g. RocksDB, other libraries, or the JVM itself.
> We should add the {{cutoff}} from the {{CONTAINERIZED_HEAP_CUTOFF_RATIO}} 
> configuration parameter to {{offHeapSize}} as implied by the description on 
> what this parameter is there for.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4506: [FLINK-7400][cluster] fix off-heap limits set to conserva...

2017-10-30 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4506
  
I just tested this on a real yarn cluster with `./bin/flink run -m 
yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 -yD 
taskmanager.memory.off-heap=true -yD taskmanager.memory.size=260 -yD 
taskmanager.memory.preallocate=true ./examples/batch/WordCount.jar` (the 
equivalent of the test) and verified that it was failing on Flink 1.3.2 but 
working with the proposed fix.

I also tested the counterpart without the three memory options, i.e. 
`./bin/flink run -m yarn-cluster -yn 1 -ys 2 -yjm 768 -ytm 1024 
./examples/batch/WordCount.jar` and verified it is working with the PR changes


---


[jira] [Created] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2017-10-30 Thread JIRA
Bartłomiej Tartanus created FLINK-7949:
--

 Summary: AsyncWaitOperator is not restarting when queue is full
 Key: FLINK-7949
 URL: https://issues.apache.org/jira/browse/FLINK-7949
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2
Reporter: Bartłomiej Tartanus


Issue was describe here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
Issue - AsyncWaitOperator can't restart properly after failure (thread is 
waiting forever)

Scenario to reproduce this issue:
1. The queue is full (let's assume that its capacity is N elements) 
2. There is some pending element waiting, so the 
pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
while-loop in addAsyncBufferEntry method is trying to add this element to 
the queue (but element is not added because queue is full) 
3. Now the snapshot is taken - the whole queue of N elements is being 
written into the ListState in snapshotState method and also (what is more 
important) this pendingStreamElementQueueEntry is written to this list too. 
4. The process is being restarted, so it tries to recover all the elements 
and put them again into the queue, but the list of recovered elements hold 
N+1 element and our queue capacity is only N. Process is not started yet, so 
it can not process any element and this one element is waiting endlessly. 
But it's never added and the process will never process anything. Deadlock. 
5. Trigger is fired and indeed discarded because the process is not running 
yet. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225068#comment-16225068
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147727170
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

I tend to agree with @pnowojski about the API surface but I think in this 
case this is a valid safety net for possible future transaction sinks.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147727170
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap> 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

I tend to agree with @pnowojski about the API surface but I think in this 
case this is a valid safety net for possible future transaction sinks.


---


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225064#comment-16225064
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147654876
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

Why are these removed? Did they never actually test anything?


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147654876
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

Why are these removed? Did they never actually test anything?


---


[jira] [Commented] (FLINK-7418) Replace all uses of jackson with flink-shaded-jackson

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225054#comment-16225054
 ] 

ASF GitHub Bot commented on FLINK-7418:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4923

[FLINK-7418][build] Integrate flink-shaded-jackson2 

## What is the purpose of the change

This PR replaces all usages of com.fasterxml.jackson with 
flink-shaded-jackson.

One change of note is that flink-mesos now shades 
com.netflix.fenzo:fenzo-core so that it can use its own jackson version.

## Brief change log

* replace all jackson dependencies with flink-shaded-jackson
* add used undeclared dependencies to many modules
* replace all usages of com.fasterxml.jackson with 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson
* modify mesos shade-plugin configuration to shade fenzo-core
* modify quickstarts to exclude flink-shaded-jackson instead
* (implicit) bump jackson version to 2.7.9

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7418

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4923


commit bac19a90544fba63693983970dece1a4fee5f14a
Author: zentol 
Date:   2017-10-16T10:48:12Z

[FLINK-7418][build] Integrate flink-shaded-jackson2

commit 97dcac37a783a746d1d6c77099d6a241febb22e6
Author: zentol 
Date:   2017-10-16T10:49:38Z

kafka

commit ee8ae4d312193f288e7b671f13efd69a827dca58
Author: zentol 
Date:   2017-10-16T10:51:11Z

gelly

commit fa1dd1e9a0bb6af4ce2d2a4a7c6bcd502dbfaa99
Author: zentol 
Date:   2017-10-16T10:51:46Z

streaming examples

commit 39fbd1a66f94bff3d01231a1d9028ca5e534186a
Author: zentol 
Date:   2017-10-16T10:52:23Z

DataDog reporter

commit 6725ef65ce0e0c8e45c5002ea47d66f9e3e5aa28
Author: zentol 
Date:   2017-10-16T10:53:37Z

runtime-web

commit 7d661a849d3c7eca0dda676492bf852ab598d2f0
Author: zentol 
Date:   2017-10-16T10:54:30Z

optimizer

commit 87f2dc8e9fcad3c69ce166e853cac3a3c4b89039
Author: zentol 
Date:   2017-10-16T10:56:09Z

flink-tests

commit 81af03d9560b1da043b4a211ece8092df2b17c61
Author: zentol 
Date:   2017-10-16T10:57:08Z

mesos

commit 6fa19385f343675737fc0d0c8e06366167662af5
Author: zentol 
Date:   2017-10-16T10:59:14Z

quickstarts

commit 5d27f2cade24a37b6d22427d01cf2d3274f62feb
Author: zentol 
Date:   2017-10-16T10:59:43Z

table

commit 992fb2ebfbb100a69d22e100b6ec217672f14b76
Author: zentol 
Date:   2017-10-16T11:01:06Z

streaming-java

commit f3c645c8fa415181cea42bc8039897516ddd26a4
Author: zentol 
Date:   2017-10-16T11:01:55Z

yarn-tests

commit 90abe30d7dc762e64231ef47ddd1ed40d0d80cb5
Author: zentol 
Date:   2017-10-16T11:03:43Z

runtime




> Replace all uses of jackson with flink-shaded-jackson
> -
>
> Key: FLINK-7418
> URL: https://issues.apache.org/jira/browse/FLINK-7418
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Jackson is currently used to create JSON responses in the web UI, in the 
> future possibly for the client REST communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4923: [FLINK-7418][build] Integrate flink-shaded-jackson...

2017-10-30 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4923

[FLINK-7418][build] Integrate flink-shaded-jackson2 

## What is the purpose of the change

This PR replaces all usages of com.fasterxml.jackson with 
flink-shaded-jackson.

One change of note is that flink-mesos now shades 
com.netflix.fenzo:fenzo-core so that it can use its own jackson version.

## Brief change log

* replace all jackson dependencies with flink-shaded-jackson
* add used undeclared dependencies to many modules
* replace all usages of com.fasterxml.jackson with 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson
* modify mesos shade-plugin configuration to shade fenzo-core
* modify quickstarts to exclude flink-shaded-jackson instead
* (implicit) bump jackson version to 2.7.9

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7418

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4923


commit bac19a90544fba63693983970dece1a4fee5f14a
Author: zentol 
Date:   2017-10-16T10:48:12Z

[FLINK-7418][build] Integrate flink-shaded-jackson2

commit 97dcac37a783a746d1d6c77099d6a241febb22e6
Author: zentol 
Date:   2017-10-16T10:49:38Z

kafka

commit ee8ae4d312193f288e7b671f13efd69a827dca58
Author: zentol 
Date:   2017-10-16T10:51:11Z

gelly

commit fa1dd1e9a0bb6af4ce2d2a4a7c6bcd502dbfaa99
Author: zentol 
Date:   2017-10-16T10:51:46Z

streaming examples

commit 39fbd1a66f94bff3d01231a1d9028ca5e534186a
Author: zentol 
Date:   2017-10-16T10:52:23Z

DataDog reporter

commit 6725ef65ce0e0c8e45c5002ea47d66f9e3e5aa28
Author: zentol 
Date:   2017-10-16T10:53:37Z

runtime-web

commit 7d661a849d3c7eca0dda676492bf852ab598d2f0
Author: zentol 
Date:   2017-10-16T10:54:30Z

optimizer

commit 87f2dc8e9fcad3c69ce166e853cac3a3c4b89039
Author: zentol 
Date:   2017-10-16T10:56:09Z

flink-tests

commit 81af03d9560b1da043b4a211ece8092df2b17c61
Author: zentol 
Date:   2017-10-16T10:57:08Z

mesos

commit 6fa19385f343675737fc0d0c8e06366167662af5
Author: zentol 
Date:   2017-10-16T10:59:14Z

quickstarts

commit 5d27f2cade24a37b6d22427d01cf2d3274f62feb
Author: zentol 
Date:   2017-10-16T10:59:43Z

table

commit 992fb2ebfbb100a69d22e100b6ec217672f14b76
Author: zentol 
Date:   2017-10-16T11:01:06Z

streaming-java

commit f3c645c8fa415181cea42bc8039897516ddd26a4
Author: zentol 
Date:   2017-10-16T11:01:55Z

yarn-tests

commit 90abe30d7dc762e64231ef47ddd1ed40d0d80cb5
Author: zentol 
Date:   2017-10-16T11:03:43Z

runtime




---


[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object

2017-10-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225051#comment-16225051
 ] 

Aljoscha Krettek commented on FLINK-7947:
-

+1 I think most code around {{ParemeterTool}} would benefit from a proper 
design and refactoring.

> Let ParameterTool return a dedicated GlobalJobParameters object
> ---
>
> Key: FLINK-7947
> URL: https://issues.apache.org/jira/browse/FLINK-7947
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>
> The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
> interface. Additionally it has grown over time to not only store the 
> configuration parameters but also to record which parameters have been 
> requested and what default value was set. This information is irrelevant on 
> the server side when setting a {{GlobalJobParameters}} object via 
> {{ExecutionConfig#setGlobalJobParameters}}.
> Since we don't separate the {{ParameterTool}} logic and the actual data view, 
> users ran into problems when reusing the same {{ParameterTool}} to start 
> multiple jobs concurrently (see FLINK-7943). I think it would be a much 
> clearer separation of concerns if we would actually split the 
> {{GlobalJobParameters}} from the {{ParameterTool}}.
> Furthermore, we should think about whether {{ParameterTool#get}} should have 
> side effects or not as it does right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4922: [hotfix][metrics] Cleanup ScopeFormats

2017-10-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4922#discussion_r147719227
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
 ---
@@ -38,30 +36,9 @@
// 

 
/**
-* Creates all default scope formats.
-*/
-   public ScopeFormats() {
--- End diff --

Can't the class be `final`? Either way it has a private constructor.


---


[GitHub] flink issue #4794: [build][minor] Add missing licenses

2017-10-30 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4794
  
Adding the license to `browserconfig.xml` looks fine, but why change the 
user configurations `masters`, `slaves`, and `zoo.cfg`? Are these even 
copyrightable?


---


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225030#comment-16225030
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147717560
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

True, you are right. One would need to implement the `org.slf4j.Logger` 
interface.
- http://projects.lidalia.org.uk/slf4j-test/ provides a `org.slf4j.Logger` 
implementation for unit testing but this approach comes with other problems 
(e.g. https://github.com/Mahoney/slf4j-test/issues/15, [NOP if log level is  
disabled](https://github.com/Mahoney/slf4j-test/blob/master/src/main/java/uk/org/lidalia/slf4jtest/TestLogger.java#L449)

Right now I don't see a way to do it properly. Any help is welcome.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147717560
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

True, you are right. One would need to implement the `org.slf4j.Logger` 
interface.
- http://projects.lidalia.org.uk/slf4j-test/ provides a `org.slf4j.Logger` 
implementation for unit testing but this approach comes with other problems 
(e.g. https://github.com/Mahoney/slf4j-test/issues/15, [NOP if log level is  
disabled](https://github.com/Mahoney/slf4j-test/blob/master/src/main/java/uk/org/lidalia/slf4jtest/TestLogger.java#L449)

Right now I don't see a way to do it properly. Any help is welcome.


---


[GitHub] flink pull request #4922: [hotfix][metrics] Cleanup ScopeFormats

2017-10-30 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4922

[hotfix][metrics] Cleanup ScopeFormats

## What is the purpose of the change

 This PR cleans up a few things related to `ScopeFormats`.

## Brief change log

* remove unused constructor
* remove redundant `MetricRegistryConfiguration#createScopeConfig`
* remove unnecessary default constructor (passing an empty configuration 
instead is easier to maintain)
* limit visibility of constructor

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink metrics_hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4922.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4922


commit 3211152f9b0d357c40fef6f924ca9170ba34d3a5
Author: zentol 
Date:   2017-10-30T14:07:52Z

[hotfix][metrics] Remove MetricRegistryConfiguration#createScopeConfig

This method duplicates ScopeFormats#fromConfig

commit 5fcd4d64fba1aaa1e093dc70bcf19087604debc4
Author: zentol 
Date:   2017-10-30T14:09:42Z

[hotfix][metrics] Remove unused ScopeFormats constructor

commit 5be7e4e648d2426cd59ebeb0872d03d1202642e8
Author: zentol 
Date:   2017-10-30T14:11:39Z

[hotfix][metrics] Remove unnecessary ScopeFormats constructor

commit bbd3b8f5c955a46cfa72d13136110f3ec959f459
Author: zentol 
Date:   2017-10-30T14:12:00Z

[hotfix][metrics] Limit visibility of ScopeFormats constructor




---


[jira] [Assigned] (FLINK-7732) Test instability in Kafka end-to-end test (invalid Kafka offset)

2017-10-30 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-7732:
-

Assignee: Piotr Nowojski

> Test instability in Kafka end-to-end test (invalid Kafka offset)
> 
>
> Key: FLINK-7732
> URL: https://issues.apache.org/jira/browse/FLINK-7732
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes in the network stack, the Kafa 
> end-to-end test was failing with an invalid offset:
> {code}
> 2017-09-28 06:34:10,736 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2017-09-28 06:34:10,744 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2017-09-28 06:34:14,549 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,573 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,686 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:14,687 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) dead for group myconsumer
> 2017-09-28 06:34:14,792 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator testing-gce-f67551b8-8867-41c5-b36e-4196d0f11e30:9092 (id: 
> 2147483647 rack: null) for group myconsumer.
> 2017-09-28 06:34:15,068 INFO  
> org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
> DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
> part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
> Unnamed (1/1),5,Flink Task Threads] took 948 ms.
> 2017-09-28 06:34:15,164 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> 2017-09-28 06:34:15,171 ERROR 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
> Kafka commit failed.
> java.lang.IllegalArgumentException: Invalid offset: -915623761772
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:687)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:531)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:499)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:217)
> {code}
> https://travis-ci.org/apache/flink/jobs/280722829
> [~pnowojski] did a first analysis that revealed this:
> In 
> org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java:229 
> this is being sent:
> {{long offsetToCommit = lastProcessedOffset + 1;}}
> {{lastProcessedOffset}} comes from:
> {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#snapshotState}}
>  either lines 741 or 749
> The value that we see is strangely similiar to 
> {{org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel#GROUP_OFFSET}}
> {code}
> /**
>  * Magic number that defines the partition 

[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225008#comment-16225008
 ] 

ASF GitHub Bot commented on FLINK-7902:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4919#discussion_r147711564
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List 
pendingCommitTransactions) {
public void setContext(Optional context) {
this.context = context;
}
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   State state = (State) o;
+
--- End diff --

I used IntelliJ to generate those. :sweat_smile:


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> 
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType, 
> GenericType)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...

2017-10-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4919#discussion_r147711564
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List 
pendingCommitTransactions) {
public void setContext(Optional context) {
this.context = context;
}
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   State state = (State) o;
+
--- End diff --

I used IntelliJ to generate those. :sweat_smile:


---


[jira] [Updated] (FLINK-7948) YarnApplicationMaster should also load hdfs-site.xml

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7948:
-
Summary: YarnApplicationMaster should also load hdfs-site.xml  (was: 
YarnApplicationMaster should also load the hdfs-site.xml)

> YarnApplicationMaster should also load hdfs-site.xml
> 
>
> Key: FLINK-7948
> URL: https://issues.apache.org/jira/browse/FLINK-7948
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> The {{YarnApplicationMaster}} uses the {{YarnConfiguration}} to load the Yarn 
> configuration. This class automatically loads the {{core-site.xml}} and the 
> {{yarn-site.xml}} from the class path. It, however, does not load the 
> {{hdfs-site.xml}}.
> I propose to also trying to load the {{hdfs-site.xml}} and add it to the 
> {{YarnConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224984#comment-16224984
 ] 

ASF GitHub Bot commented on FLINK-7902:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4919#discussion_r147704985
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List 
pendingCommitTransactions) {
public void setContext(Optional context) {
this.context = context;
}
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   State state = (State) o;
+
--- End diff --

Not a deal breaker but these if's are a bit too complicated.
Using `Objects.equals()` on each of the components and `&&` would simplify 
them.


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> 
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType, 
> GenericType)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...

2017-10-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4919#discussion_r147704985
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -361,5 +376,247 @@ public void setPendingCommitTransactions(List 
pendingCommitTransactions) {
public void setContext(Optional context) {
this.context = context;
}
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   State state = (State) o;
+
--- End diff --

Not a deal breaker but these if's are a bit too complicated.
Using `Objects.equals()` on each of the components and `&&` would simplify 
them.


---


[jira] [Created] (FLINK-7948) YarnApplicationMaster should also load the hdfs-site.xml

2017-10-30 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7948:


 Summary: YarnApplicationMaster should also load the hdfs-site.xml
 Key: FLINK-7948
 URL: https://issues.apache.org/jira/browse/FLINK-7948
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Critical
 Fix For: 1.4.0


The {{YarnApplicationMaster}} uses the {{YarnConfiguration}} to load the Yarn 
configuration. This class automatically loads the {{core-site.xml}} and the 
{{yarn-site.xml}} from the class path. It, however, does not load the 
{{hdfs-site.xml}}.

I propose to also trying to load the {{hdfs-site.xml}} and add it to the 
{{YarnConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224978#comment-16224978
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'm not sure if you can easily disable convergence in child pom.

I think that shading happens after resolving dependency versions. So first 
you would need to fix dependency convergence for given module to ensure that 
during compilation everything will be ok/converged. Shading happens after that 
and I think it can only fix/avoid potential errors in other modules, that 
depend on a module that shaded something.


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-10-30 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'm not sure if you can easily disable convergence in child pom.

I think that shading happens after resolving dependency versions. So first 
you would need to fix dependency convergence for given module to ensure that 
during compilation everything will be ok/converged. Shading happens after that 
and I think it can only fix/avoid potential errors in other modules, that 
depend on a module that shaded something.


---


[jira] [Closed] (FLINK-7696) Add projection push-down support for TableSources with time attributes

2017-10-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7696.

   Resolution: Implemented
Fix Version/s: 1.4.0

Implemented for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e

> Add projection push-down support for TableSources with time attributes
> --
>
> Key: FLINK-7696
> URL: https://issues.apache.org/jira/browse/FLINK-7696
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
> Fix For: 1.4.0
>
>
> Table sources that implement the {{DefinedProctimeAttribute}} or 
> {{DefinedRowtimeAttribute}} do not support projection push-down even if they 
> also implement {{ProjectableTableSource}}. 
> There are several problems:
> - the schema of a {{TableSource}} that implements {{DefinedRowtimeAttribute}} 
> or {{DefinedProctimeAttribute}} is constructed in the catalog not in the 
> {{TableSource}} (proctime fields are always appended at the end).
> - the {{ProjectableTableSource.projectFields()}} method returns the projected 
> fields as int indicies. In order to handle the indicies correctly, the 
> TableSource would need to know about the internals of the Table API.
> - {{ProjectableTableSource.projectFields()}} might reorder fields and move a 
> proctime field into the middle of the schema. However, the TableSource has no 
> control over that.
> - A {{TableSource}} that implements {{DefinedRowtimeAttribute}} or 
> {{DefinedProctimeAttribute}} would need to change the return values of 
> {{getRowtimeAttribute()}} or {{getProctimeAttribute()}} depending on whether 
> the attribute is kept or not.
> Adjusting the schema of table sources inside of the Table API makes all of 
> this quite messy. Maybe we need to refine the interfaces. For instance, we 
> could ask users to explicitly add time indicator fields in the 
> {{TypeInformation}} returned by {{TableSource.getReturnType()}}. However, 
> that might collide with plans to add computable time attributes as proposed 
> in FLINK-7548.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6870) Combined batch and stream TableSource can not produce same time attributes

2017-10-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6870.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e

> Combined batch and stream TableSource can not produce same time attributes
> --
>
> Key: FLINK-6870
> URL: https://issues.apache.org/jira/browse/FLINK-6870
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
> Fix For: 1.4.0
>
>
> If a class implements both {{BatchTableSource}} and {{StreamTableSource}}, it 
> is not possible to declare a time attribute which is valid for both 
> environments. For batch it should be a regular field, but not for streaming. 
> The {{getReturnType}} method does not know the environment in which it is 
> called.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7179) ProjectableTableSource interface doesn't compatible with BoundedOutOfOrdernessTimestampExtractor

2017-10-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7179.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e

> ProjectableTableSource interface doesn't compatible with 
> BoundedOutOfOrdernessTimestampExtractor
> 
>
> Key: FLINK-7179
> URL: https://issues.apache.org/jira/browse/FLINK-7179
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
> Fix For: 1.4.0
>
>
> In the implementation of window of stream sql, 
> BoundedOutOfOrdernessTimestampExtractor is designed to extract row time from 
> each row. It assumes the ts field is in the data stream by default. On the 
> other hand, ProjectableTableSource is designed to help projection push down. 
> If there is no row time related field in a query, the extractor can't 
> function well. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7548.

Resolution: Implemented

Implemented for 1.4.0 with 9a2ba6e058e907aef65e0af8731ca5ec8733712e

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object

2017-10-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7947:
-
Description: 
The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
interface. Additionally it has grown over time to not only store the 
configuration parameters but also to record which parameters have been 
requested and what default value was set. This information is irrelevant on the 
server side when setting a {{GlobalJobParameters}} object via 
{{ExecutionConfig#setGlobalJobParameters}}.

Since we don't separate the {{ParameterTool}} logic and the actual data view, 
users ran into problems when reusing the same {{ParameterTool}} to start 
multiple jobs concurrently (see FLINK-7943). I think it would be a much clearer 
separation of concerns if we would actually split the {{GlobalJobParameters}} 
from the {{ParameterTool}}.

Furthermore, we should think about whether {{ParameterTool#get}} should have 
side effects or not as it does right now.

  was:
The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
interface. Additionally it has grown over time to not only store the 
configuration parameters but also to record which parameters have been 
requested and what default value was set. This information is irrelevant on the 
server side when setting a {{GlobalJobParameters}} object via 
{{ExecutionConfig#setGlobalJobParameters}}.

Since we don't separate the {{ParameterTool}} logic and the actual data view, 
users ran into problems when reusing the same {{ParameterTool}} to start 
multiple jobs concurrently (see FLINK-7943). I think it would be a much clearer 
separation of concerns if we would actually split the {{GlobalJobParameters}} 
from the {{ParameterTool}}.


> Let ParameterTool return a dedicated GlobalJobParameters object
> ---
>
> Key: FLINK-7947
> URL: https://issues.apache.org/jira/browse/FLINK-7947
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>
> The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
> interface. Additionally it has grown over time to not only store the 
> configuration parameters but also to record which parameters have been 
> requested and what default value was set. This information is irrelevant on 
> the server side when setting a {{GlobalJobParameters}} object via 
> {{ExecutionConfig#setGlobalJobParameters}}.
> Since we don't separate the {{ParameterTool}} logic and the actual data view, 
> users ran into problems when reusing the same {{ParameterTool}} to start 
> multiple jobs concurrently (see FLINK-7943). I think it would be a much 
> clearer separation of concerns if we would actually split the 
> {{GlobalJobParameters}} from the {{ParameterTool}}.
> Furthermore, we should think about whether {{ParameterTool#get}} should have 
> side effects or not as it does right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   >