[jira] [Created] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-03 Thread Ted Yu (JIRA)
Ted Yu created FLINK-9736:
-

 Summary: Potential null reference in 
KeyGroupPartitionedPriorityQueue#poll()
 Key: FLINK-9736
 URL: https://issues.apache.org/jira/browse/FLINK-9736
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
final PQ headList = heapOfkeyGroupedHeaps.peek();
final T head = headList.poll();
{code}
{{peek}} call may return null.
The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions

2018-07-03 Thread Ted Yu (JIRA)
Ted Yu created FLINK-9735:
-

 Summary: Potential resource leak in 
RocksDBStateBackend#getDbOptions
 Key: FLINK-9735
 URL: https://issues.apache.org/jira/browse/FLINK-9735
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


Here is related code:
{code}
if (optionsFactory != null) {
  opt = optionsFactory.createDBOptions(opt);
}
{code}
opt, an DBOptions instance, should be closed before being rewritten.

getColumnOptions has similar issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532346#comment-16532346
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user xueyumusic commented on the issue:

https://github.com/apache/flink/pull/6188
  
@twalthr I looked around and realized that current `+` expression could 
support TimeInterval (TimePoint) addition, thus the timestampAdd api is 
duplicated and seems have no necessary. 

so I only made these changes:
1. add `1.week` expression
2. modify `quarter` as `1.quarter` 
3. modify Extract to support extracting QUARTER and WEEK, as suggested and 
verified by @walterddr 

Please have a review, @twalthr @fhueske @walterddr @hequn8128  thanks for 
all of you,


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-03 Thread xueyumusic
Github user xueyumusic commented on the issue:

https://github.com/apache/flink/pull/6188
  
@twalthr I looked around and realized that current `+` expression could 
support TimeInterval (TimePoint) addition, thus the timestampAdd api is 
duplicated and seems have no necessary. 

so I only made these changes:
1. add `1.week` expression
2. modify `quarter` as `1.quarter` 
3. modify Extract to support extracting QUARTER and WEEK, as suggested and 
verified by @walterddr 

Please have a review, @twalthr @fhueske @walterddr @hequn8128  thanks for 
all of you,


---


[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532344#comment-16532344
 ] 

ASF GitHub Bot commented on FLINK-6469:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5448
  
cc @dawidwys 


> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-07-03 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5448
  
cc @dawidwys 


---


[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532309#comment-16532309
 ] 

ASF GitHub Bot commented on FLINK-9654:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6206
  
Glad to hear. Then I will merge your PR with this test. Thanks for fixing 
this problem @zsolt-donca.


> Internal error while deserializing custom Scala TypeSerializer instances
> 
>
> Key: FLINK-9654
> URL: https://issues.apache.org/jira/browse/FLINK-9654
> Project: Flink
>  Issue Type: Bug
>Reporter: Zsolt Donca
>Assignee: Zsolt Donca
>Priority: Major
>  Labels: pull-request-available
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 
>  
> Edit: added a stack trace to help demonstrate the issue.
> 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR 
> org.apache.flink.runtime.taskmanager.Task  - Encountered fatal error 
> java.lang.InternalError - terminating the JVM
>  java.lang.InternalError: Malformed class name
>          at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171]
>          at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171]
>          at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) 
> ~[na:1.8.0_171]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.re

[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6206
  
Glad to hear. Then I will merge your PR with this test. Thanks for fixing 
this problem @zsolt-donca.


---


[jira] [Commented] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532293#comment-16532293
 ] 

ASF GitHub Bot commented on FLINK-9686:
---

Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6221
  
@tillrohrmann @tzulitai Thank you!


> Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
> 
>
> Key: FLINK-9686
> URL: https://issues.apache.org/jira/browse/FLINK-9686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Assignee: Franz Thoma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> h2. Current situation:
> FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials 
> via one of the following mechanisms:
>  * Environment variables
>  * System properties
>  * An AWS profile
>  * Directly provided credentials (\{{BASIC}})
>  * AWS's own default heuristic (\{{AUTO}})
> For streaming across AWS accounts, it is considered good practise to enable 
> access to the remote Kinesis stream via a role, rather than passing 
> credentials for the remote account.
> h2. Proposed change:
> Add a new credentials provider specifying a role ARN, session name, and an 
> additional credentials provider supplying the credentials for assuming the 
> role.
> Config example for assuming role {{}} with auto-detected 
> credentials:{{}}
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: AUTO
> {code}
> {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to 
> assume a role which in turn is allowed to assume another role:
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: ASSUME_ROLE
> aws.credentials.provider.role.provider.role.arn: 
> aws.credentials.provider.role.provider.role.sessionName: 
> my-nested-session-name
> aws.credentials.provider.role.provider.role.provider: AUTO
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...

2018-07-03 Thread fmthoma
Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6221
  
@tillrohrmann @tzulitai Thank you!


---


[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532289#comment-16532289
 ] 

ASF GitHub Bot commented on FLINK-9707:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r200015421
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

The case we want to guard against is that another process created the 
directory `file` just before calling `file.mkdir`. In this case, `file.mkdir` 
would return `false`. Therefore, we need to check whether the created `file` is 
really a directory. If this is the case, then we should return `true` because 
the directory was created.


> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...

2018-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r200015421
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

The case we want to guard against is that another process created the 
directory `file` just before calling `file.mkdir`. In this case, `file.mkdir` 
would return `false`. Therefore, we need to check whether the created `file` is 
really a directory. If this is the case, then we should return `true` because 
the directory was created.


---


[jira] [Resolved] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9695.
--
Resolution: Fixed

Fixed via b230bf0e3883ee2dba47e22e781009aa9d0f000e

> Add option for Mesos executor to forcefully pull Docker images
> --
>
> Key: FLINK-9695
> URL: https://issues.apache.org/jira/browse/FLINK-9695
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Leonid Ishimnikov
>Assignee: Leonid Ishimnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be useful to have an option to forcefully pull Docker images for 
> tasks, rather than reuse a previously cached image. Such option exists in 
> many Mesos frameworks, and it significantly simplifies debugging. I propose 
> adding a new 
> {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9686.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Added via 229ed7755c5bddd9856233e019ff3fa8ddef29a7

> Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
> 
>
> Key: FLINK-9686
> URL: https://issues.apache.org/jira/browse/FLINK-9686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Assignee: Franz Thoma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> h2. Current situation:
> FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials 
> via one of the following mechanisms:
>  * Environment variables
>  * System properties
>  * An AWS profile
>  * Directly provided credentials (\{{BASIC}})
>  * AWS's own default heuristic (\{{AUTO}})
> For streaming across AWS accounts, it is considered good practise to enable 
> access to the remote Kinesis stream via a role, rather than passing 
> credentials for the remote account.
> h2. Proposed change:
> Add a new credentials provider specifying a role ARN, session name, and an 
> additional credentials provider supplying the credentials for assuming the 
> role.
> Config example for assuming role {{}} with auto-detected 
> credentials:{{}}
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: AUTO
> {code}
> {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to 
> assume a role which in turn is allowed to assume another role:
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: ASSUME_ROLE
> aws.credentials.provider.role.provider.role.arn: 
> aws.credentials.provider.role.provider.role.sessionName: 
> my-nested-session-name
> aws.credentials.provider.role.provider.role.provider: AUTO
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9636.
--
Resolution: Fixed

Fixed via
1.6.0: efc87083e371eb00e801ef29c65ff49dfb170a4d
1.5.1: e58a8e4a0946c636652428a898ad53f30d4d4583

> Network buffer leaks in requesting a batch of segments during canceling
> ---
>
> Key: FLINK-9636
> URL: https://issues.apache.org/jira/browse/FLINK-9636
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} 
> is increased by {{numRequiredBuffers}} first.
> If {{InterruptedException}} is thrown during polling segments from the 
> available queue, the requested segments will be recycled back to 
> {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number 
> of polled segments which is now inconsistent with {{numRequiredBuffers}}. So 
> {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and 
> we can also decrease {{numRequiredBuffers}} to fix this bug.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9708) Network buffer leaks when buffer request fails during buffer redistribution

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9708.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via
1.6.0:
390e451f77d874b3255b20e0ea164d6743190aa2
63730b61de3342d3ee4c0d0e3c543d55ab966773

1.5.1:
90d5b40e2f832e52f366bd0d4e96823ad091f22a
07afe1d8cdeddb3a3d7ed96a2d2055715abcc6f2

> Network buffer leaks when buffer request fails during buffer redistribution
> ---
>
> Key: FLINK-9708
> URL: https://issues.apache.org/jira/browse/FLINK-9708
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> If an exception is thrown in {{NetworkBufferPool#requestMemorySegments()}}'s 
> first call to {{redistributeBuffers()}}, the accounting for 
> {{numTotalRequiredBuffers}} is wrong for future uses of this buffer pool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-03 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9633.
--
Resolution: Fixed

Fixed via
1.6.0: 64bc4b348ddd1f0c63e806442ffd3aad5c367a28
1.5.1: 9198adc655895818eb1659f8a613c6893042e42b

> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6221


---


[jira] [Commented] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532280#comment-16532280
 ] 

ASF GitHub Bot commented on FLINK-9686:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6221


> Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
> 
>
> Key: FLINK-9686
> URL: https://issues.apache.org/jira/browse/FLINK-9686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Assignee: Franz Thoma
>Priority: Major
>  Labels: pull-request-available
>
> h2. Current situation:
> FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials 
> via one of the following mechanisms:
>  * Environment variables
>  * System properties
>  * An AWS profile
>  * Directly provided credentials (\{{BASIC}})
>  * AWS's own default heuristic (\{{AUTO}})
> For streaming across AWS accounts, it is considered good practise to enable 
> access to the remote Kinesis stream via a role, rather than passing 
> credentials for the remote account.
> h2. Proposed change:
> Add a new credentials provider specifying a role ARN, session name, and an 
> additional credentials provider supplying the credentials for assuming the 
> role.
> Config example for assuming role {{}} with auto-detected 
> credentials:{{}}
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: AUTO
> {code}
> {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to 
> assume a role which in turn is allowed to assume another role:
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: 
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: ASSUME_ROLE
> aws.credentials.provider.role.provider.role.arn: 
> aws.credentials.provider.role.provider.role.sessionName: 
> my-nested-session-name
> aws.credentials.provider.role.provider.role.provider: AUTO
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6238: [FLINK-9636][network] fix inconsistency with faile...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6238


---


[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532283#comment-16532283
 ] 

ASF GitHub Bot commented on FLINK-9636:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6238


> Network buffer leaks in requesting a batch of segments during canceling
> ---
>
> Key: FLINK-9636
> URL: https://issues.apache.org/jira/browse/FLINK-9636
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} 
> is increased by {{numRequiredBuffers}} first.
> If {{InterruptedException}} is thrown during polling segments from the 
> available queue, the requested segments will be recycled back to 
> {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number 
> of polled segments which is now inconsistent with {{numRequiredBuffers}}. So 
> {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and 
> we can also decrease {{numRequiredBuffers}} to fix this bug.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532281#comment-16532281
 ] 

ASF GitHub Bot commented on FLINK-9695:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6232


> Add option for Mesos executor to forcefully pull Docker images
> --
>
> Key: FLINK-9695
> URL: https://issues.apache.org/jira/browse/FLINK-9695
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Leonid Ishimnikov
>Assignee: Leonid Ishimnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be useful to have an option to forcefully pull Docker images for 
> tasks, rather than reuse a previously cached image. Such option exists in 
> many Mesos frameworks, and it significantly simplifies debugging. I propose 
> adding a new 
> {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532282#comment-16532282
 ] 

ASF GitHub Bot commented on FLINK-9633:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6194


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6194


---


[GitHub] flink pull request #6232: [FLINK-9695] [mesos] Add option for Mesos executor...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6232


---


[jira] [Commented] (FLINK-8336) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability

2018-07-03 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532274#comment-16532274
 ] 

Till Rohrmann commented on FLINK-8336:
--

Another instance https://api.travis-ci.org/v3/job/399602067/log.txt

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability
> ---
>
> Key: FLINK-8336
> URL: https://issues.apache.org/jira/browse/FLINK-8336
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.4.3, 1.5.1
>
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3}} fails on 
> Travis. I suspect that this has something to do with the consistency 
> guarantees S3 gives us.
> https://travis-ci.org/tillrohrmann/flink/jobs/323930297



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-07-03 Thread Mingdao Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingdao Yang reassigned FLINK-4582:
---

Assignee: Mingdao Yang

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Mingdao Yang
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-07-03 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 7/4/18 4:09 AM:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java:

{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-07-03 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 7/4/18 4:08 AM:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java:

{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java:
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-07-03 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 7/4/18 4:08 AM:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java:
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java:
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2018-07-03 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532220#comment-16532220
 ] 

Ted Yu commented on FLINK-4534:
---

The synchronization should be added - if there is no concurrent call(s), there 
is no contention either.

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532207#comment-16532207
 ] 

ASF GitHub Bot commented on FLINK-9636:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6238
  
👍


> Network buffer leaks in requesting a batch of segments during canceling
> ---
>
> Key: FLINK-9636
> URL: https://issues.apache.org/jira/browse/FLINK-9636
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} 
> is increased by {{numRequiredBuffers}} first.
> If {{InterruptedException}} is thrown during polling segments from the 
> available queue, the requested segments will be recycled back to 
> {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number 
> of polled segments which is now inconsistent with {{numRequiredBuffers}}. So 
> {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and 
> we can also decrease {{numRequiredBuffers}} to fix this bug.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...

2018-07-03 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6238
  
👍


---


[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532186#comment-16532186
 ] 

ASF GitHub Bot commented on FLINK-9707:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r17861
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

emm, why not use `synchronized` to prevent? 


> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...

2018-07-03 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r17861
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

emm, why not use `synchronized` to prevent? 


---


[jira] [Assigned] (FLINK-9733) Make location for job graph files configurable

2018-07-03 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9733:
---

Assignee: vinoyang

> Make location for job graph files configurable
> --
>
> Key: FLINK-9733
> URL: https://issues.apache.org/jira/browse/FLINK-9733
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Job-Submission
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is 
> serialized and written to a file.
> Currently we just use {{Files.createTempFile}} for this purposes.
> This location should be made configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9732) Report more detailed error message on SobSubmissionFailure

2018-07-03 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9732:
---

Assignee: vinoyang

> Report more detailed error message on SobSubmissionFailure
> --
>
> Key: FLINK-9732
> URL: https://issues.apache.org/jira/browse/FLINK-9732
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>
> Currently, if the job submission through the {{JobSubmitHandler}} fails the 
> error message returned tot he client only says "Job submission failed.".
> As outlined in the discussion in this 
> [PR|https://github.com/apache/flink/pull/6222] we should try to include more 
> information about the actual failure cause.
> The proposed solution is to encode the cause for the failure in the 
> {{Acknowledge}} that is returned by {{DispatcherGateway#submitJob}}.
> {code}
> public class AckOrException {
>   // holds exception, could also be a series of nullable fields
>   private final SuperEither 
> exception; 
>   ...
>   public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532167#comment-16532167
 ] 

ASF GitHub Bot commented on FLINK-9328:
---

Github user Myasuka commented on the issue:

https://github.com/apache/flink/pull/5984
  
The failing UT JobManagerFailsITCase in 
[#25170.9-build](https://travis-ci.org/apache/flink/jobs/393258719) is not 
related with this PR, shall I re-trigger the CI process?


> RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to 
> StateBackendTestBase class not register snapshots in some UTs
> -
>
> Key: FLINK-9328
> URL: https://issues.apache.org/jira/browse/FLINK-9328
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> Currently, StateBackendTestBase class does not register snapshots to 
> SharedStateRegistry in testValueState, testListState, testReducingState, 
> testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to 
> restore from PlaceholderStreamStateHandle during the 2nd restore procedure if 
> one specific sst file both existed in the 1st snapshot and the 2nd snapshot 
> handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9328:
--
Labels: pull-request-available  (was: )

> RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to 
> StateBackendTestBase class not register snapshots in some UTs
> -
>
> Key: FLINK-9328
> URL: https://issues.apache.org/jira/browse/FLINK-9328
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> Currently, StateBackendTestBase class does not register snapshots to 
> SharedStateRegistry in testValueState, testListState, testReducingState, 
> testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to 
> restore from PlaceholderStreamStateHandle during the 2nd restore procedure if 
> one specific sst file both existed in the 1st snapshot and the 2nd snapshot 
> handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore probl...

2018-07-03 Thread Myasuka
Github user Myasuka commented on the issue:

https://github.com/apache/flink/pull/5984
  
The failing UT JobManagerFailsITCase in 
[#25170.9-build](https://travis-ci.org/apache/flink/jobs/393258719) is not 
related with this PR, shall I re-trigger the CI process?


---


[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532152#comment-16532152
 ] 

ASF GitHub Bot commented on FLINK-9707:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r12317
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

seems you are right, maybe Till want to prevent concurrent problem? like 
the above code comment : check file exist first. cc @tillrohrmann 


> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...

2018-07-03 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r12317
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

seems you are right, maybe Till want to prevent concurrent problem? like 
the above code comment : check file exist first. cc @tillrohrmann 


---


[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532132#comment-16532132
 ] 

ASF GitHub Bot commented on FLINK-9707:
---

Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r199989644
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

the `file.isDirectory()` always return false, isn't it?


> LocalFileSystem does not support concurrent directory creations
> ---
>
> Key: FLINK-9707
> URL: https://issues.apache.org/jira/browse/FLINK-9707
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The {{LocalFileSystem}} does not support concurrent directory creations. The 
> consequence is that file system operations fail.
> I think the culprit is the following line: 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...

2018-07-03 Thread lamber-ken
Github user lamber-ken commented on a diff in the pull request:

https://github.com/apache/flink/pull/6243#discussion_r199989644
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---
@@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) {
}
else {
File parent = file.getParentFile();
-   return (parent == null || mkdirsInternal(parent)) && 
file.mkdir();
+   return (parent == null || mkdirsInternal(parent)) && 
(file.mkdir() || file.isDirectory());
--- End diff --

the `file.isDirectory()` always return false, isn't it?


---


[jira] [Commented] (FLINK-9730) avoid access static via class reference

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532109#comment-16532109
 ] 

ASF GitHub Bot commented on FLINK-9730:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6247
  
@zentol, @tillrohrmann hi, cc. I fixed some cases access static via class 
reference


> avoid access static via class reference
> ---
>
> Key: FLINK-9730
> URL: https://issues.apache.org/jira/browse/FLINK-9730
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> [code refactor] access static via class reference



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6247: [FLINK-9730] [code refactor] fix access static via class ...

2018-07-03 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6247
  
@zentol, @tillrohrmann hi, cc. I fixed some cases access static via class 
reference


---


[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532104#comment-16532104
 ] 

ASF GitHub Bot commented on FLINK-9666:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@zentol, hi, I found another occurrence just after the #6212 closed.
but I can't reopen that pr, so I start a new pr refered to #6212.


> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...

2018-07-03 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@zentol, hi, I found another occurrence just after the #6212 closed.
but I can't reopen that pr, so I start a new pr refered to #6212.


---


[jira] [Updated] (FLINK-8864) Add CLI query history in SQL Client

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8864:
--
Labels: pull-request-available  (was: )

> Add CLI query history in SQL Client
> ---
>
> Key: FLINK-8864
> URL: https://issues.apache.org/jira/browse/FLINK-8864
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It would be great to have the possibility of persisting the CLI's query 
> history. Such that queries can be reused when the CLI Client is started 
> again. Also a search feature as it is offered by terminals would be good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8864) Add CLI query history in SQL Client

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532093#comment-16532093
 ] 

ASF GitHub Bot commented on FLINK-8864:
---

GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6250

[FLINK-8864] added command history

## What is the purpose of the change

*This PR adds history for sql queries/sqlclient commands*


## Brief change log

  - *Added history file for commands*

## Verifying this change
  - *Manually verification*

1. run sql-sclient, perform whatever queries, close close client
2. run sql-client again, check history (up, down, forward and backward 
searches via ctrl+s, ctrl+r)


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_8864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6250


commit 08ee97e1c4b0274b8508e6cfb6f47db1e33212eb
Author: snuyanzin 
Date:   2018-07-04T00:30:14Z

[FLINK-8864] added command history




> Add CLI query history in SQL Client
> ---
>
> Key: FLINK-8864
> URL: https://issues.apache.org/jira/browse/FLINK-8864
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It would be great to have the possibility of persisting the CLI's query 
> history. Such that queries can be reused when the CLI Client is started 
> again. Also a search feature as it is offered by terminals would be good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6250: [FLINK-8864] added command history

2018-07-03 Thread snuyanzin
GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6250

[FLINK-8864] added command history

## What is the purpose of the change

*This PR adds history for sql queries/sqlclient commands*


## Brief change log

  - *Added history file for commands*

## Verifying this change
  - *Manually verification*

1. run sql-sclient, perform whatever queries, close close client
2. run sql-client again, check history (up, down, forward and backward 
searches via ctrl+s, ctrl+r)


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_8864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6250


commit 08ee97e1c4b0274b8508e6cfb6f47db1e33212eb
Author: snuyanzin 
Date:   2018-07-04T00:30:14Z

[FLINK-8864] added command history




---


[jira] [Assigned] (FLINK-8864) Add CLI query history in SQL Client

2018-07-03 Thread Sergey Nuyanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-8864:
--

Assignee: Sergey Nuyanzin

> Add CLI query history in SQL Client
> ---
>
> Key: FLINK-8864
> URL: https://issues.apache.org/jira/browse/FLINK-8864
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It would be great to have the possibility of persisting the CLI's query 
> history. Such that queries can be reused when the CLI Client is started 
> again. Also a search feature as it is offered by terminals would be good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-9731.
-
Resolution: Invalid

> Kafka source subtask begins to consume from earliest offset
> ---
>
> Key: FLINK-9731
> URL: https://issues.apache.org/jira/browse/FLINK-9731
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink 
> job instance began consuming records from the earliest offsets available in 
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this 
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no 
> failed checkpoints and the job did not restore or restart. Flink logs only 
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711  Fetch offset 2340400514 is out of range for 
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka 
> connector consumer is configured to start from group offsets if it is not 
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
> consumer is configured to fallback to the earliest offsets is no group 
> offsets are committed by setting `auto.offset.reset` to `earliest` in the 
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership 
> of its partitions for around 30 seconds as a result of losing its connection 
> to ZooKeeper.
>  
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
> partition [cloud_ioc_events,32] to broker 
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server 
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to 
> 10.210.48.187/10.210.48.187:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection, 
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server 
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to 
> 10.210.43.200/10.210.43.200:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server 
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 
> 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? 
> false) (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path 
> /brokers/ids/2005 with addresses: 
> EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINT

[jira] [Commented] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532040#comment-16532040
 ] 

Elias Levy commented on FLINK-9731:
---

Closing as I suspect the error is on the Kafka side.  Logs indicate Kafka 
truncated the partition in the process of the broker catching up with a replica 
and regaining leadership of the partition.  But that would imply that somehow 
Kafka allowed Flink to read an uncommitted message, as we are publishing with 
acks=all, and the topic has min.insync.replicas=2, which breaks the consistency 
guarantees of Kafka.

The truncation lead to the Flink fetch being considered out-of-range, causing 
the auto.offset.reset logic in the Kafka consumer kicking in, leading to 
consumption from the earliest offset.

> Kafka source subtask begins to consume from earliest offset
> ---
>
> Key: FLINK-9731
> URL: https://issues.apache.org/jira/browse/FLINK-9731
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink 
> job instance began consuming records from the earliest offsets available in 
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this 
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no 
> failed checkpoints and the job did not restore or restart. Flink logs only 
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711  Fetch offset 2340400514 is out of range for 
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka 
> connector consumer is configured to start from group offsets if it is not 
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
> consumer is configured to fallback to the earliest offsets is no group 
> offsets are committed by setting `auto.offset.reset` to `earliest` in the 
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership 
> of its partitions for around 30 seconds as a result of losing its connection 
> to ZooKeeper.
>  
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
> partition [cloud_ioc_events,32] to broker 
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server 
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to 
> 10.210.48.187/10.210.48.187:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection, 
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server 
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to 
> 10.210.43.200/10.210.43.200:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server 
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
> timeout = 6000 (org.apache.zookeeper.C

[jira] [Updated] (FLINK-9734) Typo in ' field-deleimiter'

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9734:
--
Labels: pull-request-available  (was: )

> Typo in ' field-deleimiter'
> ---
>
> Key: FLINK-9734
> URL: https://issues.apache.org/jira/browse/FLINK-9734
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Trivial
>  Labels: pull-request-available
>
> typo at 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format
> {quote}  field-deleimiter: ","  # optional: string delimiter "," by 
> default {quote}
> should be _delimiter_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9734) Typo in ' field-deleimiter'

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532026#comment-16532026
 ] 

ASF GitHub Bot commented on FLINK-9734:
---

GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6249

[hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo

## What is the purpose of the change

*This PR fixes typo in deleimeter word at 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format*


## Brief change log
  - *type fix at docs/dev/table/sqlClient.md*



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FIELD_DELIMETER_TYPO

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6249


commit c538c8585ee66585bdf61cfa326668f82bb0e532
Author: snuyanzin 
Date:   2018-07-03T17:40:56Z

[FLINK-9734] Fix 'deleimiter' typo




> Typo in ' field-deleimiter'
> ---
>
> Key: FLINK-9734
> URL: https://issues.apache.org/jira/browse/FLINK-9734
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Trivial
>  Labels: pull-request-available
>
> typo at 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format
> {quote}  field-deleimiter: ","  # optional: string delimiter "," by 
> default {quote}
> should be _delimiter_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6249: [hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo

2018-07-03 Thread snuyanzin
GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6249

[hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo

## What is the purpose of the change

*This PR fixes typo in deleimeter word at 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format*


## Brief change log
  - *type fix at docs/dev/table/sqlClient.md*



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FIELD_DELIMETER_TYPO

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6249


commit c538c8585ee66585bdf61cfa326668f82bb0e532
Author: snuyanzin 
Date:   2018-07-03T17:40:56Z

[FLINK-9734] Fix 'deleimiter' typo




---


[jira] [Created] (FLINK-9734) Typo in ' field-deleimiter'

2018-07-03 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9734:
--

 Summary: Typo in ' field-deleimiter'
 Key: FLINK-9734
 URL: https://issues.apache.org/jira/browse/FLINK-9734
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table API & SQL
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


typo at 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format
{quote}  field-deleimiter: ","  # optional: string delimiter "," by default 
{quote}
should be _delimiter_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531930#comment-16531930
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199951101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Sounds good, also, I've updated the DDL design doc to call it TABLE 
CONNECTOR, which I thin k it is more clear.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-03 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199951101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Sounds good, also, I've updated the DDL design doc to call it TABLE 
CONNECTOR, which I thin k it is more clear.


---


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531920#comment-16531920
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@twalthr @fhueske sounds good to me. We can do that in a follow-up issue 
for `from-source`, and we will not support `from-source` in this PR.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-07-03 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@twalthr @fhueske sounds good to me. We can do that in a follow-up issue 
for `from-source`, and we will not support `from-source` in this PR.


---


[jira] [Updated] (FLINK-9311) PubSub connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9311:
--
Labels: pull-request-available  (was: )

> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Priority: Minor
>  Labels: pull-request-available
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9311) PubSub connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531854#comment-16531854
 ] 

ASF GitHub Bot commented on FLINK-9311:
---

GitHub user Xeli opened a pull request:

https://github.com/apache/flink/pull/6248

[FLINK-9311] [pubsub] Added PubSub connector with support for checkpointing

## What is the purpose of the change

Adding a PubSub connector with support for Checkpointing

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests*
 - *Manually verified the connector (without Checkpointing) on an actual 
PubSub topic and subscription.*
**Is there a need for integration tests? I did not see any for the other 
connectors. 
What is a good way of testing the checkpointing / exactly-once behavior?**

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, Google Cloud 
Sdk for PubSub (**Does this need to be shaded?**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): don't know, 
don't think so
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, checkpointing
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xeli/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6248


commit dddbe671a0d663045110b89ad9bb85ce9a7e7051
Author: Richard Deurwaarder 
Date:   2018-05-26T12:59:32Z

[FLINK-9311] [pubsub] Add PubSubSource without checkpointing

commit 30fab0fd6810691f22ff583ce3f942e247d9fe45
Author: Richard Deurwaarder 
Date:   2018-07-03T17:34:02Z

[FLINK-9311] [pubsub] Add checkpointing to PubSubSource




> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Priority: Minor
>  Labels: pull-request-available
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6248: [FLINK-9311] [pubsub] Added PubSub connector with ...

2018-07-03 Thread Xeli
GitHub user Xeli opened a pull request:

https://github.com/apache/flink/pull/6248

[FLINK-9311] [pubsub] Added PubSub connector with support for checkpointing

## What is the purpose of the change

Adding a PubSub connector with support for Checkpointing

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests*
 - *Manually verified the connector (without Checkpointing) on an actual 
PubSub topic and subscription.*
**Is there a need for integration tests? I did not see any for the other 
connectors. 
What is a good way of testing the checkpointing / exactly-once behavior?**

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, Google Cloud 
Sdk for PubSub (**Does this need to be shaded?**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): don't know, 
don't think so
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, checkpointing
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xeli/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6248


commit dddbe671a0d663045110b89ad9bb85ce9a7e7051
Author: Richard Deurwaarder 
Date:   2018-05-26T12:59:32Z

[FLINK-9311] [pubsub] Add PubSubSource without checkpointing

commit 30fab0fd6810691f22ff583ce3f942e247d9fe45
Author: Richard Deurwaarder 
Date:   2018-07-03T17:34:02Z

[FLINK-9311] [pubsub] Add checkpointing to PubSubSource




---


[jira] [Updated] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-9731:
--
Description: 
On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs only shoed the 
following message:
{noformat}
June 30th 2018, 02:35:01.711Fetch offset 2340400514 is out of range for 
partition topic-124, resetting offset
{noformat}

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this it the broker that the subtask was consuming from, as outgoing 
network traffic from it spiked after the broker recovered leadership of its 

[jira] [Commented] (FLINK-9289) Parallelism of generated operators should have max parallism of input

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531843#comment-16531843
 ] 

ASF GitHub Bot commented on FLINK-9289:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6241


> Parallelism of generated operators should have max parallism of input
> -
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction 
> mappers to their predecessor. This is done by assigning the same parallelism 
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be 
> chained anymore and the operator is generated with default parallelism. This 
> can lead to a {code}NoResourceAvailableException: Not enough free slots 
> available to run the job.{code} as reported by a user on the mailing list: 
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max 
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at 
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531842#comment-16531842
 ] 

ASF GitHub Bot commented on FLINK-8785:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6222


> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9733) Make location for job graph files configurable

2018-07-03 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9733:
---

 Summary: Make location for job graph files configurable
 Key: FLINK-9733
 URL: https://issues.apache.org/jira/browse/FLINK-9733
 Project: Flink
  Issue Type: Improvement
  Components: Client, Job-Submission
Affects Versions: 1.6.0, 1.5.1
Reporter: Chesnay Schepler


During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is 
serialized and written to a file.
Currently we just use {{Files.createTempFile}} for this purposes.
This location should be made configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

2018-07-03 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6222


---


[GitHub] flink pull request #6241: [backport][FLINK-9289][rest] Rework JobSubmitHandl...

2018-07-03 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6241


---


[jira] [Created] (FLINK-9732) Report more detailed error message on SobSubmissionFailure

2018-07-03 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9732:
---

 Summary: Report more detailed error message on SobSubmissionFailure
 Key: FLINK-9732
 URL: https://issues.apache.org/jira/browse/FLINK-9732
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler


Currently, if the job submission through the {{JobSubmitHandler}} fails the 
error message returned tot he client only says "Job submission failed.".

As outlined in the discussion in this 
[PR|https://github.com/apache/flink/pull/6222] we should try to include more 
information about the actual failure cause.

The proposed solution is to encode the cause for the failure in the 
{{Acknowledge}} that is returned by {{DispatcherGateway#submitJob}}.
{code}
public class AckOrException {
// holds exception, could also be a series of nullable fields
private final SuperEither 
exception; 
...
public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC;
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9280.
---
Resolution: Fixed

master: a25cd3feddd19e75456db32a704ee5509e85dd47

1.5: 797709cb2466610b1d5b05c12e43d3f7d4f70183

> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8785.
---
   Resolution: Fixed
Fix Version/s: 1.5.1
   1.6.0

master: 81d135578c026842e6d8bc95391da60886612166

1.5: 06b9bf13197712f3c6e81b380386b78dc5979de0

> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9301) NotSoMiniClusterIterations job fails on travis

2018-07-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9301.
---
   Resolution: Fixed
Fix Version/s: (was: 1.5.1)
   1.6.0

master: 885640f781aa66359d929eb387f27a6024d75025

> NotSoMiniClusterIterations job fails on travis
> --
>
> Key: FLINK-9301
> URL: https://issues.apache.org/jira/browse/FLINK-9301
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The high-parallelism-iterations-test fails on travis. After starting ~55 
> taskmanagers all memory is used and further memory allocations fail.
> I'm currently letting it run another time, if it fails again I will disable 
> the test temporarily.
> https://travis-ci.org/zentol/flink-ci/builds/375189790



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9731:
-

 Summary: Kafka source subtask begins to consume from earliest 
offset
 Key: FLINK-9731
 URL: https://issues.apache.org/jira/browse/FLINK-9731
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Elias Levy


On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs show no 
indication of anything amiss. There were no errors in the or Kafka related 
messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this 

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531822#comment-16531822
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6203


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6203


---


[jira] [Commented] (FLINK-9301) NotSoMiniClusterIterations job fails on travis

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531821#comment-16531821
 ] 

ASF GitHub Bot commented on FLINK-9301:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6024


> NotSoMiniClusterIterations job fails on travis
> --
>
> Key: FLINK-9301
> URL: https://issues.apache.org/jira/browse/FLINK-9301
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> The high-parallelism-iterations-test fails on travis. After starting ~55 
> taskmanagers all memory is used and further memory allocations fail.
> I'm currently letting it run another time, if it fails again I will disable 
> the test temporarily.
> https://travis-ci.org/zentol/flink-ci/builds/375189790



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9301) NotSoMiniClusterIterations job fails on travis

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9301:
--
Labels: pull-request-available  (was: )

> NotSoMiniClusterIterations job fails on travis
> --
>
> Key: FLINK-9301
> URL: https://issues.apache.org/jira/browse/FLINK-9301
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>
> The high-parallelism-iterations-test fails on travis. After starting ~55 
> taskmanagers all memory is used and further memory allocations fail.
> I'm currently letting it run another time, if it fails again I will disable 
> the test temporarily.
> https://travis-ci.org/zentol/flink-ci/builds/375189790



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6024: [FLINK-9301] [e2e test] Add back "not so mini clus...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6024


---


[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531816#comment-16531816
 ] 

ASF GitHub Bot commented on FLINK-9666:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6230
  
wait.

Is this a _backport_ of #6212 for 1.5, or an _extension_ that fixes another 
occurrence?


> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...

2018-07-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6230
  
wait.

Is this a _backport_ of #6212 for 1.5, or an _extension_ that fixes another 
occurrence?


---


[jira] [Commented] (FLINK-9730) avoid access static via class reference

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531794#comment-16531794
 ] 

ASF GitHub Bot commented on FLINK-9730:
---

GitHub user lamber-ken opened a pull request:

https://github.com/apache/flink/pull/6247

[FLINK-9730] [code refactor] fix access static via class reference

## What is the purpose of the change
  - fix access static via class reference

## Brief change log

  - fix access static via class reference

## Verifying this change

  - fix access static via class reference

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lamber-ken/flink FLINK-9730

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6247.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6247


commit d8278659cf6373aaa4d71320ad09d540955d396f
Author: lamber-ken 
Date:   2018-07-03T18:36:43Z

[code refactore] access static via class reference




> avoid access static via class reference
> ---
>
> Key: FLINK-9730
> URL: https://issues.apache.org/jira/browse/FLINK-9730
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> [code refactor] access static via class reference



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9730) avoid access static via class reference

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9730:
--
Labels: pull-request-available  (was: )

> avoid access static via class reference
> ---
>
> Key: FLINK-9730
> URL: https://issues.apache.org/jira/browse/FLINK-9730
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> [code refactor] access static via class reference



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9730) avoid access static via class reference

2018-07-03 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-9730:
--
Description: [code refactor] access static via class reference  (was: [code 
refactore] access static via class reference)

> avoid access static via class reference
> ---
>
> Key: FLINK-9730
> URL: https://issues.apache.org/jira/browse/FLINK-9730
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> [code refactor] access static via class reference



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6247: [FLINK-9730] [code refactor] fix access static via...

2018-07-03 Thread lamber-ken
GitHub user lamber-ken opened a pull request:

https://github.com/apache/flink/pull/6247

[FLINK-9730] [code refactor] fix access static via class reference

## What is the purpose of the change
  - fix access static via class reference

## Brief change log

  - fix access static via class reference

## Verifying this change

  - fix access static via class reference

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lamber-ken/flink FLINK-9730

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6247.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6247


commit d8278659cf6373aaa4d71320ad09d540955d396f
Author: lamber-ken 
Date:   2018-07-03T18:36:43Z

[code refactore] access static via class reference




---


[jira] [Commented] (FLINK-9641) Pulsar Source Connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531777#comment-16531777
 ] 

ASF GitHub Bot commented on FLINK-9641:
---

Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909775
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+/**
+ * Base class for pulsar sources.
+ * @param 
--- End diff --

will add a comment


> Pulsar Source Connector
> ---
>
> Key: FLINK-9641
> URL: https://issues.apache.org/jira/browse/FLINK-9641
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Chris Kellogg
>Priority: Minor
>  Labels: pull-request-available
>
> Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
> messaging system currently in apache incubation. It is a very active project 
> and there are committers from various companies and good adoption. This pr 
> will add a source function to allow Flink jobs to process messages from 
> Pulsar topics.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9641) Pulsar Source Connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531778#comment-16531778
 ] 

ASF GitHub Bot commented on FLINK-9641:
---

Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909793
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
 ---
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Matchers.any;
+
+/**
+ * Tests for the PulsarConsumerSource. The source supports two operation 
modes.
+ * 1) At-least-once (when checkpointed) with Pulsar message 
acknowledgements and the deduplication mechanism in
+ *{@link 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
+ * 3) No strong delivery guarantees (without checkpointing) with Pulsar 
acknowledging messages after
+ *   after it receives x number of messages.
+ *
+ * This tests assumes that the MessageIds are increasing monotonously. 
That doesn't have to be the
+ * case. The MessageId is used to uniquely identify messages.
+ */
+public class PulsarConsumerSourceTests {
+
+   private PulsarConsumerSource source;
+
+   private TestConsumer consumer;
+
+   private TestSourceContext context;
+
+   private Thread sourceThread;
+
+   private Exception exception;
--- End diff --

will fix


> Pulsar Source Connector
> ---
>
> Key: FLINK-9641
> URL: https://issues.apache.org/jira/browse/FLINK-9641
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Chris Kellogg
>Priority: Minor
>  Labels: pull-request-available
>
> Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
> messaging system currently in apache incubation. It is a very active project 
> and there are committers from various companies and good adoption. This

[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

2018-07-03 Thread cckellogg
Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909793
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
 ---
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Matchers.any;
+
+/**
+ * Tests for the PulsarConsumerSource. The source supports two operation 
modes.
+ * 1) At-least-once (when checkpointed) with Pulsar message 
acknowledgements and the deduplication mechanism in
+ *{@link 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}..
+ * 3) No strong delivery guarantees (without checkpointing) with Pulsar 
acknowledging messages after
+ *   after it receives x number of messages.
+ *
+ * This tests assumes that the MessageIds are increasing monotonously. 
That doesn't have to be the
+ * case. The MessageId is used to uniquely identify messages.
+ */
+public class PulsarConsumerSourceTests {
+
+   private PulsarConsumerSource source;
+
+   private TestConsumer consumer;
+
+   private TestSourceContext context;
+
+   private Thread sourceThread;
+
+   private Exception exception;
--- End diff --

will fix


---


[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

2018-07-03 Thread cckellogg
Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909775
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+/**
+ * Base class for pulsar sources.
+ * @param 
--- End diff --

will add a comment


---


[jira] [Commented] (FLINK-9641) Pulsar Source Connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531774#comment-16531774
 ] 

ASF GitHub Bot commented on FLINK-9641:
---

Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909617
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pulsar source (consumer) which receives messages from a topic and 
acknowledges messages.
+ * When checkpointing is enabled, it guarantees at least once processing 
semantics.
+ *
+ * When checkpointing is disabled, it auto acknowledges messages based 
on the number of messages it has
+ * received. In this mode messages may be dropped.
+ */
+class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PulsarConsumerSource.class);
+
+   private final int messageReceiveTimeoutMs = 100;
+   private final String serviceUrl;
+   private final String topic;
+   private final String subscriptionName;
+   private final DeserializationSchema deserializer;
+
+   private PulsarClient client;
+   private Consumer consumer;
+
+   private boolean isCheckpointingEnabled;
+
+   private final long acknowledgementBatchSize;
+   private long batchCount;
+   private long totalMessageCount;
+
+   private transient volatile boolean isRunning;
+
+   PulsarConsumerSource(PulsarSourceBuilder builder) {
+   super(MessageId.class);
+   this.serviceUrl = builder.serviceUrl;
+   this.topic = builder.topic;
+   this.deserializer = builder.deserializationSchema;
+   this.subscriptionName = builder.subscriptionName;
+   this.acknowledgementBatchSize = 
builder.acknowledgementBatchSize;
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+
+   final RuntimeContext context = getRuntimeContext();
+   if (context instanceof StreamingRuntimeContext) {
+   isCheckpointingEnabled = ((StreamingRuntimeContext) 
context).isCheckpointingEnabled();
+   }
+
+   client = createClient();
+   consumer = createConsumer(client);
+
+   isRunning = true;
+   }
+
+   @Override
+   protected void acknowledgeIDs(long checkpointId, Set 
messageIds) {
+   if (consumer == null) {
+   LOG.error("null consumer unable to acknowledge 
messages");
+   

[jira] [Commented] (FLINK-9641) Pulsar Source Connector

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531775#comment-16531775
 ] 

ASF GitHub Bot commented on FLINK-9641:
---

Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909645
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+/**
+ * Default values for Pulsar connectors.
+ */
+public class Defaults {
--- End diff --

will remove.


> Pulsar Source Connector
> ---
>
> Key: FLINK-9641
> URL: https://issues.apache.org/jira/browse/FLINK-9641
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Chris Kellogg
>Priority: Minor
>  Labels: pull-request-available
>
> Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
> messaging system currently in apache incubation. It is a very active project 
> and there are committers from various companies and good adoption. This pr 
> will add a source function to allow Flink jobs to process messages from 
> Pulsar topics.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

2018-07-03 Thread cckellogg
Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909645
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+/**
+ * Default values for Pulsar connectors.
+ */
+public class Defaults {
--- End diff --

will remove.


---


[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

2018-07-03 Thread cckellogg
Github user cckellogg commented on a diff in the pull request:

https://github.com/apache/flink/pull/6200#discussion_r199909617
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pulsar source (consumer) which receives messages from a topic and 
acknowledges messages.
+ * When checkpointing is enabled, it guarantees at least once processing 
semantics.
+ *
+ * When checkpointing is disabled, it auto acknowledges messages based 
on the number of messages it has
+ * received. In this mode messages may be dropped.
+ */
+class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PulsarConsumerSource.class);
+
+   private final int messageReceiveTimeoutMs = 100;
+   private final String serviceUrl;
+   private final String topic;
+   private final String subscriptionName;
+   private final DeserializationSchema deserializer;
+
+   private PulsarClient client;
+   private Consumer consumer;
+
+   private boolean isCheckpointingEnabled;
+
+   private final long acknowledgementBatchSize;
+   private long batchCount;
+   private long totalMessageCount;
+
+   private transient volatile boolean isRunning;
+
+   PulsarConsumerSource(PulsarSourceBuilder builder) {
+   super(MessageId.class);
+   this.serviceUrl = builder.serviceUrl;
+   this.topic = builder.topic;
+   this.deserializer = builder.deserializationSchema;
+   this.subscriptionName = builder.subscriptionName;
+   this.acknowledgementBatchSize = 
builder.acknowledgementBatchSize;
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+
+   final RuntimeContext context = getRuntimeContext();
+   if (context instanceof StreamingRuntimeContext) {
+   isCheckpointingEnabled = ((StreamingRuntimeContext) 
context).isCheckpointingEnabled();
+   }
+
+   client = createClient();
+   consumer = createConsumer(client);
+
+   isRunning = true;
+   }
+
+   @Override
+   protected void acknowledgeIDs(long checkpointId, Set 
messageIds) {
+   if (consumer == null) {
+   LOG.error("null consumer unable to acknowledge 
messages");
+   throw new RuntimeException("null pulsar consumer unable 
to acknowledge messages");
+   }
+
+   if (messageIds.isEmpty()) {
+   LOG.info("no message ids to acknowledge");
+   return;
  

[jira] [Created] (FLINK-9730) avoid access static via class reference

2018-07-03 Thread lamber-ken (JIRA)
lamber-ken created FLINK-9730:
-

 Summary: avoid access static via class reference
 Key: FLINK-9730
 URL: https://issues.apache.org/jira/browse/FLINK-9730
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: lamber-ken
Assignee: lamber-ken
 Fix For: 1.6.0


[code refactore] access static via class reference



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531734#comment-16531734
 ] 

ASF GitHub Bot commented on FLINK-9666:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@zentol, thanks for review. by the way, what's the best way to backport 
refactorings and code-cleanups?


> short-circuit logic should be used in boolean contexts
> --
>
> Key: FLINK-9666
> URL: https://issues.apache.org/jira/browse/FLINK-9666
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> short-circuit logic should be used in boolean contexts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...

2018-07-03 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6230
  
@zentol, thanks for review. by the way, what's the best way to backport 
refactorings and code-cleanups?


---


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531726#comment-16531726
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894398
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
+   /** Whether target type is immutable. */
+   final boolean immutableTargetType;
+
+   /** Whether target type and its fields are immutable. */
+   final boolean immutable;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   final boolean stateful;
+
+   final int hashCode;
--- End diff --

I wonder if this should be `transient` in a serializable class, the hash 
code could be based on object identity.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894398
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
+   /** Whether target type is immutable. */
+   final boolean immutableTargetType;
+
+   /** Whether target type and its fields are immutable. */
+   final boolean immutable;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   final boolean stateful;
+
+   final int hashCode;
--- End diff --

I wonder if this should be `transient` in a serializable class, the hash 
code could be based on object identity.


---


  1   2   3   4   5   >