[jira] [Created] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
Ted Yu created FLINK-9736: - Summary: Potential null reference in KeyGroupPartitionedPriorityQueue#poll() Key: FLINK-9736 URL: https://issues.apache.org/jira/browse/FLINK-9736 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} final PQ headList = heapOfkeyGroupedHeaps.peek(); final T head = headList.poll(); {code} {{peek}} call may return null. The return value should be checked before de-referencing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9735) Potential resource leak in RocksDBStateBackend#getDbOptions
Ted Yu created FLINK-9735: - Summary: Potential resource leak in RocksDBStateBackend#getDbOptions Key: FLINK-9735 URL: https://issues.apache.org/jira/browse/FLINK-9735 Project: Flink Issue Type: Bug Reporter: Ted Yu Here is related code: {code} if (optionsFactory != null) { opt = optionsFactory.createDBOptions(opt); } {code} opt, an DBOptions instance, should be closed before being rewritten. getColumnOptions has similar issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532346#comment-16532346 ] ASF GitHub Bot commented on FLINK-6846: --- Github user xueyumusic commented on the issue: https://github.com/apache/flink/pull/6188 @twalthr I looked around and realized that current `+` expression could support TimeInterval (TimePoint) addition, thus the timestampAdd api is duplicated and seems have no necessary. so I only made these changes: 1. add `1.week` expression 2. modify `quarter` as `1.quarter` 3. modify Extract to support extracting QUARTER and WEEK, as suggested and verified by @walterddr Please have a review, @twalthr @fhueske @walterddr @hequn8128 thanks for all of you, > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user xueyumusic commented on the issue: https://github.com/apache/flink/pull/6188 @twalthr I looked around and realized that current `+` expression could support TimeInterval (TimePoint) addition, thus the timestampAdd api is duplicated and seems have no necessary. so I only made these changes: 1. add `1.week` expression 2. modify `quarter` as `1.quarter` 3. modify Extract to support extracting QUARTER and WEEK, as suggested and verified by @walterddr Please have a review, @twalthr @fhueske @walterddr @hequn8128 thanks for all of you, ---
[jira] [Commented] (FLINK-6469) Configure Memory Sizes with units
[ https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532344#comment-16532344 ] ASF GitHub Bot commented on FLINK-6469: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5448 cc @dawidwys > Configure Memory Sizes with units > - > > Key: FLINK-6469 > URL: https://issues.apache.org/jira/browse/FLINK-6469 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently, memory sizes are configured by pure numbers, the interpretation is > different from configuration parameter to parameter. > For example, heap sizes are configured in megabytes, network buffer memory is > configured in bytes, alignment thresholds are configured in bytes. > I propose to configure all memory parameters the same way, with units similar > to time. The JVM itself configured heap size similarly: {{Xmx5g}} or > {{Xmx2000m}}. > {code} > 1 -> bytes > 10 kb > 64 mb > 1 gb > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5448 cc @dawidwys ---
[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances
[ https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532309#comment-16532309 ] ASF GitHub Bot commented on FLINK-9654: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6206 Glad to hear. Then I will merge your PR with this test. Thanks for fixing this problem @zsolt-donca. > Internal error while deserializing custom Scala TypeSerializer instances > > > Key: FLINK-9654 > URL: https://issues.apache.org/jira/browse/FLINK-9654 > Project: Flink > Issue Type: Bug >Reporter: Zsolt Donca >Assignee: Zsolt Donca >Priority: Major > Labels: pull-request-available > > When you are using custom `TypeSerializer` instances implemented in Scala, > the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can > manifest itself when a Flink job is restored from checkpoint or started with > a savepoint. > The reason is that in such a restore from checkpoint or savepoint, Flink uses > `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type > serializers and their configurations. The deserialization walks through the > entire object graph corresponding, and for each class it calls > `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place > for FLINK-6869). If there is an internal class defined in a Scala object for > which `getSimpleName` fails (see the Scala issue), then a > `java.lang.InternalError` is thrown which causes the task manager to restart. > In this case, Flink tries to restart the job on another task manager, causing > all the task managers to restart, wreaking havoc on the entire Flink cluster. > There are some alternative type information derivation mechanisms that rely > on anonymous classes and, most importantly, classes generated by macros, that > can easily trigger the above problem. I am personally working on > [https://github.com/zsolt-donca/flink-alt], and there is also > [https://github.com/joroKr21/flink-shapeless] > I prepared a pull request that fixes the issue. > > Edit: added a stack trace to help demonstrate the issue. > 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR > org.apache.flink.runtime.taskmanager.Task - Encountered fatal error > java.lang.InternalError - terminating the JVM > java.lang.InternalError: Malformed class name > at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171] > at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171] > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > ~[na:1.8.0_171] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.re
[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6206 Glad to hear. Then I will merge your PR with this test. Thanks for fixing this problem @zsolt-donca. ---
[jira] [Commented] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
[ https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532293#comment-16532293 ] ASF GitHub Bot commented on FLINK-9686: --- Github user fmthoma commented on the issue: https://github.com/apache/flink/pull/6221 @tillrohrmann @tzulitai Thank you! > Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole > > > Key: FLINK-9686 > URL: https://issues.apache.org/jira/browse/FLINK-9686 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Franz Thoma >Assignee: Franz Thoma >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > h2. Current situation: > FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials > via one of the following mechanisms: > * Environment variables > * System properties > * An AWS profile > * Directly provided credentials (\{{BASIC}}) > * AWS's own default heuristic (\{{AUTO}}) > For streaming across AWS accounts, it is considered good practise to enable > access to the remote Kinesis stream via a role, rather than passing > credentials for the remote account. > h2. Proposed change: > Add a new credentials provider specifying a role ARN, session name, and an > additional credentials provider supplying the credentials for assuming the > role. > Config example for assuming role {{}} with auto-detected > credentials:{{}} > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: AUTO > {code} > {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to > assume a role which in turn is allowed to assume another role: > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: ASSUME_ROLE > aws.credentials.provider.role.provider.role.arn: > aws.credentials.provider.role.provider.role.sessionName: > my-nested-session-name > aws.credentials.provider.role.provider.role.provider: AUTO > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...
Github user fmthoma commented on the issue: https://github.com/apache/flink/pull/6221 @tillrohrmann @tzulitai Thank you! ---
[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
[ https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532289#comment-16532289 ] ASF GitHub Bot commented on FLINK-9707: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r200015421 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- The case we want to guard against is that another process created the directory `file` just before calling `file.mkdir`. In this case, `file.mkdir` would return `false`. Therefore, we need to check whether the created `file` is really a directory. If this is the case, then we should return `true` because the directory was created. > LocalFileSystem does not support concurrent directory creations > --- > > Key: FLINK-9707 > URL: https://issues.apache.org/jira/browse/FLINK-9707 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The {{LocalFileSystem}} does not support concurrent directory creations. The > consequence is that file system operations fail. > I think the culprit is the following line: > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r200015421 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- The case we want to guard against is that another process created the directory `file` just before calling `file.mkdir`. In this case, `file.mkdir` would return `false`. Therefore, we need to check whether the created `file` is really a directory. If this is the case, then we should return `true` because the directory was created. ---
[jira] [Resolved] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images
[ https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9695. -- Resolution: Fixed Fixed via b230bf0e3883ee2dba47e22e781009aa9d0f000e > Add option for Mesos executor to forcefully pull Docker images > -- > > Key: FLINK-9695 > URL: https://issues.apache.org/jira/browse/FLINK-9695 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Leonid Ishimnikov >Assignee: Leonid Ishimnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be useful to have an option to forcefully pull Docker images for > tasks, rather than reuse a previously cached image. Such option exists in > many Mesos frameworks, and it significantly simplifies debugging. I propose > adding a new > {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
[ https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9686. -- Resolution: Fixed Fix Version/s: 1.6.0 Added via 229ed7755c5bddd9856233e019ff3fa8ddef29a7 > Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole > > > Key: FLINK-9686 > URL: https://issues.apache.org/jira/browse/FLINK-9686 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Franz Thoma >Assignee: Franz Thoma >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > h2. Current situation: > FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials > via one of the following mechanisms: > * Environment variables > * System properties > * An AWS profile > * Directly provided credentials (\{{BASIC}}) > * AWS's own default heuristic (\{{AUTO}}) > For streaming across AWS accounts, it is considered good practise to enable > access to the remote Kinesis stream via a role, rather than passing > credentials for the remote account. > h2. Proposed change: > Add a new credentials provider specifying a role ARN, session name, and an > additional credentials provider supplying the credentials for assuming the > role. > Config example for assuming role {{}} with auto-detected > credentials:{{}} > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: AUTO > {code} > {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to > assume a role which in turn is allowed to assume another role: > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: ASSUME_ROLE > aws.credentials.provider.role.provider.role.arn: > aws.credentials.provider.role.provider.role.sessionName: > my-nested-session-name > aws.credentials.provider.role.provider.role.provider: AUTO > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling
[ https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9636. -- Resolution: Fixed Fixed via 1.6.0: efc87083e371eb00e801ef29c65ff49dfb170a4d 1.5.1: e58a8e4a0946c636652428a898ad53f30d4d4583 > Network buffer leaks in requesting a batch of segments during canceling > --- > > Key: FLINK-9636 > URL: https://issues.apache.org/jira/browse/FLINK-9636 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} > is increased by {{numRequiredBuffers}} first. > If {{InterruptedException}} is thrown during polling segments from the > available queue, the requested segments will be recycled back to > {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number > of polled segments which is now inconsistent with {{numRequiredBuffers}}. So > {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and > we can also decrease {{numRequiredBuffers}} to fix this bug. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9708) Network buffer leaks when buffer request fails during buffer redistribution
[ https://issues.apache.org/jira/browse/FLINK-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9708. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via 1.6.0: 390e451f77d874b3255b20e0ea164d6743190aa2 63730b61de3342d3ee4c0d0e3c543d55ab966773 1.5.1: 90d5b40e2f832e52f366bd0d4e96823ad091f22a 07afe1d8cdeddb3a3d7ed96a2d2055715abcc6f2 > Network buffer leaks when buffer request fails during buffer redistribution > --- > > Key: FLINK-9708 > URL: https://issues.apache.org/jira/browse/FLINK-9708 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > If an exception is thrown in {{NetworkBufferPool#requestMemorySegments()}}'s > first call to {{redistributeBuffers()}}, the accounting for > {{numTotalRequiredBuffers}} is wrong for future uses of this buffer pool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9633. -- Resolution: Fixed Fixed via 1.6.0: 64bc4b348ddd1f0c63e806442ffd3aad5c367a28 1.5.1: 9198adc655895818eb1659f8a613c6893042e42b > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6221 ---
[jira] [Commented] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
[ https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532280#comment-16532280 ] ASF GitHub Bot commented on FLINK-9686: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6221 > Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole > > > Key: FLINK-9686 > URL: https://issues.apache.org/jira/browse/FLINK-9686 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Franz Thoma >Assignee: Franz Thoma >Priority: Major > Labels: pull-request-available > > h2. Current situation: > FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials > via one of the following mechanisms: > * Environment variables > * System properties > * An AWS profile > * Directly provided credentials (\{{BASIC}}) > * AWS's own default heuristic (\{{AUTO}}) > For streaming across AWS accounts, it is considered good practise to enable > access to the remote Kinesis stream via a role, rather than passing > credentials for the remote account. > h2. Proposed change: > Add a new credentials provider specifying a role ARN, session name, and an > additional credentials provider supplying the credentials for assuming the > role. > Config example for assuming role {{}} with auto-detected > credentials:{{}} > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: AUTO > {code} > {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to > assume a role which in turn is allowed to assume another role: > {code:java} > aws.credentials.provider: ASSUME_ROLE > aws.credentials.provider.role.arn: > aws.credentials.provider.role.sessionName: my-session-name > aws.credentials.provider.role.provider: ASSUME_ROLE > aws.credentials.provider.role.provider.role.arn: > aws.credentials.provider.role.provider.role.sessionName: > my-nested-session-name > aws.credentials.provider.role.provider.role.provider: AUTO > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6238: [FLINK-9636][network] fix inconsistency with faile...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6238 ---
[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling
[ https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532283#comment-16532283 ] ASF GitHub Bot commented on FLINK-9636: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6238 > Network buffer leaks in requesting a batch of segments during canceling > --- > > Key: FLINK-9636 > URL: https://issues.apache.org/jira/browse/FLINK-9636 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} > is increased by {{numRequiredBuffers}} first. > If {{InterruptedException}} is thrown during polling segments from the > available queue, the requested segments will be recycled back to > {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number > of polled segments which is now inconsistent with {{numRequiredBuffers}}. So > {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and > we can also decrease {{numRequiredBuffers}} to fix this bug. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images
[ https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532281#comment-16532281 ] ASF GitHub Bot commented on FLINK-9695: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6232 > Add option for Mesos executor to forcefully pull Docker images > -- > > Key: FLINK-9695 > URL: https://issues.apache.org/jira/browse/FLINK-9695 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Leonid Ishimnikov >Assignee: Leonid Ishimnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be useful to have an option to forcefully pull Docker images for > tasks, rather than reuse a previously cached image. Such option exists in > many Mesos frameworks, and it significantly simplifies debugging. I propose > adding a new > {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532282#comment-16532282 ] ASF GitHub Bot commented on FLINK-9633: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6194 > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6194 ---
[GitHub] flink pull request #6232: [FLINK-9695] [mesos] Add option for Mesos executor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6232 ---
[jira] [Commented] (FLINK-8336) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability
[ https://issues.apache.org/jira/browse/FLINK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532274#comment-16532274 ] Till Rohrmann commented on FLINK-8336: -- Another instance https://api.travis-ci.org/v3/job/399602067/log.txt > YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability > --- > > Key: FLINK-8336 > URL: https://issues.apache.org/jira/browse/FLINK-8336 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.4.3, 1.5.1 > > > The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3}} fails on > Travis. I suspect that this has something to do with the consistency > guarantees S3 gives us. > https://travis-ci.org/tillrohrmann/flink/jobs/323930297 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mingdao Yang reassigned FLINK-4582: --- Assignee: Mingdao Yang > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Mingdao Yang >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 7/4/18 4:09 AM: --- In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java: {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 7/4/18 4:08 AM: --- In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java: {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java: {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 7/4/18 4:08 AM: --- In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java: {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java: {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532220#comment-16532220 ] Ted Yu commented on FLINK-4534: --- The synchronization should be added - if there is no concurrent call(s), there is no contention either. > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9636) Network buffer leaks in requesting a batch of segments during canceling
[ https://issues.apache.org/jira/browse/FLINK-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532207#comment-16532207 ] ASF GitHub Bot commented on FLINK-9636: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/6238 👍 > Network buffer leaks in requesting a batch of segments during canceling > --- > > Key: FLINK-9636 > URL: https://issues.apache.org/jira/browse/FLINK-9636 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: zhijiang >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > In {{NetworkBufferPool#requestMemorySegments}}, {{numTotalRequiredBuffers}} > is increased by {{numRequiredBuffers}} first. > If {{InterruptedException}} is thrown during polling segments from the > available queue, the requested segments will be recycled back to > {{NetworkBufferPool}}, {{numTotalRequiredBuffers}} is decreased by the number > of polled segments which is now inconsistent with {{numRequiredBuffers}}. So > {{numTotalRequiredBuffers}} in {{NetworkBufferPool}} leaks in this case, and > we can also decrease {{numRequiredBuffers}} to fix this bug. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6238: [FLINK-9636][network] fix inconsistency with failed buffe...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/6238 ð ---
[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
[ https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532186#comment-16532186 ] ASF GitHub Bot commented on FLINK-9707: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r17861 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- emm, why not use `synchronized` to prevent? > LocalFileSystem does not support concurrent directory creations > --- > > Key: FLINK-9707 > URL: https://issues.apache.org/jira/browse/FLINK-9707 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The {{LocalFileSystem}} does not support concurrent directory creations. The > consequence is that file system operations fail. > I think the culprit is the following line: > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r17861 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- emm, why not use `synchronized` to prevent? ---
[jira] [Assigned] (FLINK-9733) Make location for job graph files configurable
[ https://issues.apache.org/jira/browse/FLINK-9733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9733: --- Assignee: vinoyang > Make location for job graph files configurable > -- > > Key: FLINK-9733 > URL: https://issues.apache.org/jira/browse/FLINK-9733 > Project: Flink > Issue Type: Improvement > Components: Client, Job-Submission >Affects Versions: 1.6.0, 1.5.1 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > > During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is > serialized and written to a file. > Currently we just use {{Files.createTempFile}} for this purposes. > This location should be made configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9732) Report more detailed error message on SobSubmissionFailure
[ https://issues.apache.org/jira/browse/FLINK-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9732: --- Assignee: vinoyang > Report more detailed error message on SobSubmissionFailure > -- > > Key: FLINK-9732 > URL: https://issues.apache.org/jira/browse/FLINK-9732 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > > Currently, if the job submission through the {{JobSubmitHandler}} fails the > error message returned tot he client only says "Job submission failed.". > As outlined in the discussion in this > [PR|https://github.com/apache/flink/pull/6222] we should try to include more > information about the actual failure cause. > The proposed solution is to encode the cause for the failure in the > {{Acknowledge}} that is returned by {{DispatcherGateway#submitJob}}. > {code} > public class AckOrException { > // holds exception, could also be a series of nullable fields > private final SuperEither > exception; > ... > public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs
[ https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532167#comment-16532167 ] ASF GitHub Bot commented on FLINK-9328: --- Github user Myasuka commented on the issue: https://github.com/apache/flink/pull/5984 The failing UT JobManagerFailsITCase in [#25170.9-build](https://travis-ci.org/apache/flink/jobs/393258719) is not related with this PR, shall I re-trigger the CI process? > RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to > StateBackendTestBase class not register snapshots in some UTs > - > > Key: FLINK-9328 > URL: https://issues.apache.org/jira/browse/FLINK-9328 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.1 > > > Currently, StateBackendTestBase class does not register snapshots to > SharedStateRegistry in testValueState, testListState, testReducingState, > testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to > restore from PlaceholderStreamStateHandle during the 2nd restore procedure if > one specific sst file both existed in the 1st snapshot and the 2nd snapshot > handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs
[ https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9328: -- Labels: pull-request-available (was: ) > RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to > StateBackendTestBase class not register snapshots in some UTs > - > > Key: FLINK-9328 > URL: https://issues.apache.org/jira/browse/FLINK-9328 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.1 > > > Currently, StateBackendTestBase class does not register snapshots to > SharedStateRegistry in testValueState, testListState, testReducingState, > testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to > restore from PlaceholderStreamStateHandle during the 2nd restore procedure if > one specific sst file both existed in the 1st snapshot and the 2nd snapshot > handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore probl...
Github user Myasuka commented on the issue: https://github.com/apache/flink/pull/5984 The failing UT JobManagerFailsITCase in [#25170.9-build](https://travis-ci.org/apache/flink/jobs/393258719) is not related with this PR, shall I re-trigger the CI process? ---
[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
[ https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532152#comment-16532152 ] ASF GitHub Bot commented on FLINK-9707: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r12317 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- seems you are right, maybe Till want to prevent concurrent problem? like the above code comment : check file exist first. cc @tillrohrmann > LocalFileSystem does not support concurrent directory creations > --- > > Key: FLINK-9707 > URL: https://issues.apache.org/jira/browse/FLINK-9707 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The {{LocalFileSystem}} does not support concurrent directory creations. The > consequence is that file system operations fail. > I think the culprit is the following line: > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r12317 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- seems you are right, maybe Till want to prevent concurrent problem? like the above code comment : check file exist first. cc @tillrohrmann ---
[jira] [Commented] (FLINK-9707) LocalFileSystem does not support concurrent directory creations
[ https://issues.apache.org/jira/browse/FLINK-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532132#comment-16532132 ] ASF GitHub Bot commented on FLINK-9707: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r199989644 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- the `file.isDirectory()` always return false, isn't it? > LocalFileSystem does not support concurrent directory creations > --- > > Key: FLINK-9707 > URL: https://issues.apache.org/jira/browse/FLINK-9707 > Project: Flink > Issue Type: Improvement > Components: FileSystem >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The {{LocalFileSystem}} does not support concurrent directory creations. The > consequence is that file system operations fail. > I think the culprit is the following line: > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L257 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6243: [FLINK-9707] Support concurrent directory creation...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6243#discussion_r199989644 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java --- @@ -254,7 +254,7 @@ else if (file.exists() && !file.isDirectory()) { } else { File parent = file.getParentFile(); - return (parent == null || mkdirsInternal(parent)) && file.mkdir(); + return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); --- End diff -- the `file.isDirectory()` always return false, isn't it? ---
[jira] [Commented] (FLINK-9730) avoid access static via class reference
[ https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532109#comment-16532109 ] ASF GitHub Bot commented on FLINK-9730: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6247 @zentol, @tillrohrmann hi, cc. I fixed some cases access static via class reference > avoid access static via class reference > --- > > Key: FLINK-9730 > URL: https://issues.apache.org/jira/browse/FLINK-9730 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > [code refactor] access static via class reference -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6247: [FLINK-9730] [code refactor] fix access static via class ...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6247 @zentol, @tillrohrmann hi, cc. I fixed some cases access static via class reference ---
[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532104#comment-16532104 ] ASF GitHub Bot commented on FLINK-9666: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @zentol, hi, I found another occurrence just after the #6212 closed. but I can't reopen that pr, so I start a new pr refered to #6212. > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @zentol, hi, I found another occurrence just after the #6212 closed. but I can't reopen that pr, so I start a new pr refered to #6212. ---
[jira] [Updated] (FLINK-8864) Add CLI query history in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8864: -- Labels: pull-request-available (was: ) > Add CLI query history in SQL Client > --- > > Key: FLINK-8864 > URL: https://issues.apache.org/jira/browse/FLINK-8864 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It would be great to have the possibility of persisting the CLI's query > history. Such that queries can be reused when the CLI Client is started > again. Also a search feature as it is offered by terminals would be good. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8864) Add CLI query history in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532093#comment-16532093 ] ASF GitHub Bot commented on FLINK-8864: --- GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6250 [FLINK-8864] added command history ## What is the purpose of the change *This PR adds history for sql queries/sqlclient commands* ## Brief change log - *Added history file for commands* ## Verifying this change - *Manually verification* 1. run sql-sclient, perform whatever queries, close close client 2. run sql-client again, check history (up, down, forward and backward searches via ctrl+s, ctrl+r) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FLINK_8864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6250 commit 08ee97e1c4b0274b8508e6cfb6f47db1e33212eb Author: snuyanzin Date: 2018-07-04T00:30:14Z [FLINK-8864] added command history > Add CLI query history in SQL Client > --- > > Key: FLINK-8864 > URL: https://issues.apache.org/jira/browse/FLINK-8864 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It would be great to have the possibility of persisting the CLI's query > history. Such that queries can be reused when the CLI Client is started > again. Also a search feature as it is offered by terminals would be good. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6250: [FLINK-8864] added command history
GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6250 [FLINK-8864] added command history ## What is the purpose of the change *This PR adds history for sql queries/sqlclient commands* ## Brief change log - *Added history file for commands* ## Verifying this change - *Manually verification* 1. run sql-sclient, perform whatever queries, close close client 2. run sql-client again, check history (up, down, forward and backward searches via ctrl+s, ctrl+r) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FLINK_8864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6250 commit 08ee97e1c4b0274b8508e6cfb6f47db1e33212eb Author: snuyanzin Date: 2018-07-04T00:30:14Z [FLINK-8864] added command history ---
[jira] [Assigned] (FLINK-8864) Add CLI query history in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-8864: -- Assignee: Sergey Nuyanzin > Add CLI query history in SQL Client > --- > > Key: FLINK-8864 > URL: https://issues.apache.org/jira/browse/FLINK-8864 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It would be great to have the possibility of persisting the CLI's query > history. Such that queries can be reused when the CLI Client is started > again. Also a search feature as it is offered by terminals would be good. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy closed FLINK-9731. - Resolution: Invalid > Kafka source subtask begins to consume from earliest offset > --- > > Key: FLINK-9731 > URL: https://issues.apache.org/jira/browse/FLINK-9731 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Critical > > On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink > job instance began consuming records from the earliest offsets available in > Kafka for the partitions assigned to it. Other subtasks did not exhibit this > behavior and continued operating normally. > Previous to the event the job exhibited no Kafka lag. The job showed no > failed checkpoints and the job did not restore or restart. Flink logs only > shoed the following message: > {noformat} > June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for > partition topic-124, resetting offset > {noformat} > The job is configured with checkpoints at 1 minute intervals. The Kafka > connector consumer is configured to start from group offsets if it is not > started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka > consumer is configured to fallback to the earliest offsets is no group > offsets are committed by setting `auto.offset.reset` to `earliest` in the > Kafka consumer config. > Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership > of its partitions for around 30 seconds as a result of losing its connection > to ZooKeeper. > > {noformat} > [2018-06-30 09:34:54,799] INFO Unable to read additional data from server > sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for > partition [cloud_ioc_events,32] to broker > 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition. > (kafka.server.ReplicaFetcherThread) > {noformat} > The broker immediately reconnected to after a few tries ZK: > {noformat} > [2018-06-30 09:34:55,462] INFO Opening socket connection to server > 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,463] INFO Socket connection established to > 10.210.48.187/10.210.48.187:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,465] INFO Initiating client connection, > connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 > (org.apache.zookeeper.ZooKeeper) > [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,466] INFO EventThread shut down for session: > 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,468] INFO Opening socket connection to server > 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,468] INFO Socket connection established to > 10.210.43.200/10.210.43.200:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO Session establishment complete on server > 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker > 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) > [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? > false) (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path > /brokers/ids/2005 with addresses: > EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINT
[jira] [Commented] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532040#comment-16532040 ] Elias Levy commented on FLINK-9731: --- Closing as I suspect the error is on the Kafka side. Logs indicate Kafka truncated the partition in the process of the broker catching up with a replica and regaining leadership of the partition. But that would imply that somehow Kafka allowed Flink to read an uncommitted message, as we are publishing with acks=all, and the topic has min.insync.replicas=2, which breaks the consistency guarantees of Kafka. The truncation lead to the Flink fetch being considered out-of-range, causing the auto.offset.reset logic in the Kafka consumer kicking in, leading to consumption from the earliest offset. > Kafka source subtask begins to consume from earliest offset > --- > > Key: FLINK-9731 > URL: https://issues.apache.org/jira/browse/FLINK-9731 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Critical > > On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink > job instance began consuming records from the earliest offsets available in > Kafka for the partitions assigned to it. Other subtasks did not exhibit this > behavior and continued operating normally. > Previous to the event the job exhibited no Kafka lag. The job showed no > failed checkpoints and the job did not restore or restart. Flink logs only > shoed the following message: > {noformat} > June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for > partition topic-124, resetting offset > {noformat} > The job is configured with checkpoints at 1 minute intervals. The Kafka > connector consumer is configured to start from group offsets if it is not > started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka > consumer is configured to fallback to the earliest offsets is no group > offsets are committed by setting `auto.offset.reset` to `earliest` in the > Kafka consumer config. > Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership > of its partitions for around 30 seconds as a result of losing its connection > to ZooKeeper. > > {noformat} > [2018-06-30 09:34:54,799] INFO Unable to read additional data from server > sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for > partition [cloud_ioc_events,32] to broker > 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition. > (kafka.server.ReplicaFetcherThread) > {noformat} > The broker immediately reconnected to after a few tries ZK: > {noformat} > [2018-06-30 09:34:55,462] INFO Opening socket connection to server > 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,463] INFO Socket connection established to > 10.210.48.187/10.210.48.187:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,465] INFO Initiating client connection, > connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 > (org.apache.zookeeper.ZooKeeper) > [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,466] INFO EventThread shut down for session: > 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,468] INFO Opening socket connection to server > 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,468] INFO Socket connection established to > 10.210.43.200/10.210.43.200:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO Session establishment complete on server > 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated > timeout = 6000 (org.apache.zookeeper.C
[jira] [Updated] (FLINK-9734) Typo in ' field-deleimiter'
[ https://issues.apache.org/jira/browse/FLINK-9734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9734: -- Labels: pull-request-available (was: ) > Typo in ' field-deleimiter' > --- > > Key: FLINK-9734 > URL: https://issues.apache.org/jira/browse/FLINK-9734 > Project: Flink > Issue Type: Bug > Components: Documentation, Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Trivial > Labels: pull-request-available > > typo at > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format > {quote} field-deleimiter: "," # optional: string delimiter "," by > default {quote} > should be _delimiter_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9734) Typo in ' field-deleimiter'
[ https://issues.apache.org/jira/browse/FLINK-9734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532026#comment-16532026 ] ASF GitHub Bot commented on FLINK-9734: --- GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6249 [hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo ## What is the purpose of the change *This PR fixes typo in deleimeter word at https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format* ## Brief change log - *type fix at docs/dev/table/sqlClient.md* You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FIELD_DELIMETER_TYPO Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6249 commit c538c8585ee66585bdf61cfa326668f82bb0e532 Author: snuyanzin Date: 2018-07-03T17:40:56Z [FLINK-9734] Fix 'deleimiter' typo > Typo in ' field-deleimiter' > --- > > Key: FLINK-9734 > URL: https://issues.apache.org/jira/browse/FLINK-9734 > Project: Flink > Issue Type: Bug > Components: Documentation, Table API & SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Trivial > Labels: pull-request-available > > typo at > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format > {quote} field-deleimiter: "," # optional: string delimiter "," by > default {quote} > should be _delimiter_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6249: [hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo
GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6249 [hotfix] [docs] [FLINK-9734] Fix 'deleimiter' typo ## What is the purpose of the change *This PR fixes typo in deleimeter word at https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format* ## Brief change log - *type fix at docs/dev/table/sqlClient.md* You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FIELD_DELIMETER_TYPO Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6249 commit c538c8585ee66585bdf61cfa326668f82bb0e532 Author: snuyanzin Date: 2018-07-03T17:40:56Z [FLINK-9734] Fix 'deleimiter' typo ---
[jira] [Created] (FLINK-9734) Typo in ' field-deleimiter'
Sergey Nuyanzin created FLINK-9734: -- Summary: Typo in ' field-deleimiter' Key: FLINK-9734 URL: https://issues.apache.org/jira/browse/FLINK-9734 Project: Flink Issue Type: Bug Components: Documentation, Table API & SQL Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin typo at https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#csv-format {quote} field-deleimiter: "," # optional: string delimiter "," by default {quote} should be _delimiter_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531930#comment-16531930 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199951101 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- Sounds good, also, I've updated the DDL design doc to call it TABLE CONNECTOR, which I thin k it is more clear. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199951101 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- Sounds good, also, I've updated the DDL design doc to call it TABLE CONNECTOR, which I thin k it is more clear. ---
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531920#comment-16531920 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @twalthr @fhueske sounds good to me. We can do that in a follow-up issue for `from-source`, and we will not support `from-source` in this PR. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @twalthr @fhueske sounds good to me. We can do that in a follow-up issue for `from-source`, and we will not support `from-source` in this PR. ---
[jira] [Updated] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9311: -- Labels: pull-request-available (was: ) > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531854#comment-16531854 ] ASF GitHub Bot commented on FLINK-9311: --- GitHub user Xeli opened a pull request: https://github.com/apache/flink/pull/6248 [FLINK-9311] [pubsub] Added PubSub connector with support for checkpointing ## What is the purpose of the change Adding a PubSub connector with support for Checkpointing ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests* - *Manually verified the connector (without Checkpointing) on an actual PubSub topic and subscription.* **Is there a need for integration tests? I did not see any for the other connectors. What is a good way of testing the checkpointing / exactly-once behavior?** ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, Google Cloud Sdk for PubSub (**Does this need to be shaded?**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know, don't think so - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, checkpointing - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xeli/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6248 commit dddbe671a0d663045110b89ad9bb85ce9a7e7051 Author: Richard Deurwaarder Date: 2018-05-26T12:59:32Z [FLINK-9311] [pubsub] Add PubSubSource without checkpointing commit 30fab0fd6810691f22ff583ce3f942e247d9fe45 Author: Richard Deurwaarder Date: 2018-07-03T17:34:02Z [FLINK-9311] [pubsub] Add checkpointing to PubSubSource > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6248: [FLINK-9311] [pubsub] Added PubSub connector with ...
GitHub user Xeli opened a pull request: https://github.com/apache/flink/pull/6248 [FLINK-9311] [pubsub] Added PubSub connector with support for checkpointing ## What is the purpose of the change Adding a PubSub connector with support for Checkpointing ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests* - *Manually verified the connector (without Checkpointing) on an actual PubSub topic and subscription.* **Is there a need for integration tests? I did not see any for the other connectors. What is a good way of testing the checkpointing / exactly-once behavior?** ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, Google Cloud Sdk for PubSub (**Does this need to be shaded?**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): don't know, don't think so - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, checkpointing - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xeli/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6248 commit dddbe671a0d663045110b89ad9bb85ce9a7e7051 Author: Richard Deurwaarder Date: 2018-05-26T12:59:32Z [FLINK-9311] [pubsub] Add PubSubSource without checkpointing commit 30fab0fd6810691f22ff583ce3f942e247d9fe45 Author: Richard Deurwaarder Date: 2018-07-03T17:34:02Z [FLINK-9311] [pubsub] Add checkpointing to PubSubSource ---
[jira] [Updated] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-9731: -- Description: On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally. Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs only shoed the following message: {noformat} June 30th 2018, 02:35:01.711Fetch offset 2340400514 is out of range for partition topic-124, resetting offset {noformat} The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config. Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper. {noformat} [2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32] to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} The broker immediately reconnected to after a few tries ZK: {noformat} [2018-06-30 09:34:55,462] INFO Opening socket connection to server 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,463] INFO Socket connection established to 10.210.48.187/10.210.48.187:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper) [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,468] INFO Opening socket connection to server 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,468] INFO Socket connection established to 10.210.43.200/10.210.43.200:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO Session establishment complete on server 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses: EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) {noformat} By 9:35:02 partitions had returned to the broker. It appears this it the broker that the subtask was consuming from, as outgoing network traffic from it spiked after the broker recovered leadership of its
[jira] [Commented] (FLINK-9289) Parallelism of generated operators should have max parallism of input
[ https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531843#comment-16531843 ] ASF GitHub Bot commented on FLINK-9289: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6241 > Parallelism of generated operators should have max parallism of input > - > > Key: FLINK-9289 > URL: https://issues.apache.org/jira/browse/FLINK-9289 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > The DataSet API aims to chain generated operators such as key extraction > mappers to their predecessor. This is done by assigning the same parallelism > as the input operator. > If a generated operator has more than two inputs, the operator cannot be > chained anymore and the operator is generated with default parallelism. This > can lead to a {code}NoResourceAvailableException: Not enough free slots > available to run the job.{code} as reported by a user on the mailing list: > https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E > I suggest to set the parallelism of a generated operator to the max > parallelism of all of its inputs to fix this problem. > Until the problem is fixed, a workaround is to set the default parallelism at > the {{ExecutionEnvironment}}: > {code} > ExecutionEnvironment env = ... > env.setParallelism(2); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531842#comment-16531842 ] ASF GitHub Bot commented on FLINK-8785: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6222 > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6, pull-request-available > Fix For: 1.6.0, 1.5.1 > > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9733) Make location for job graph files configurable
Chesnay Schepler created FLINK-9733: --- Summary: Make location for job graph files configurable Key: FLINK-9733 URL: https://issues.apache.org/jira/browse/FLINK-9733 Project: Flink Issue Type: Improvement Components: Client, Job-Submission Affects Versions: 1.6.0, 1.5.1 Reporter: Chesnay Schepler During the job-submission by the {{RestClusterClient}} the {{JobGraph}} is serialized and written to a file. Currently we just use {{Files.createTempFile}} for this purposes. This location should be made configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6222 ---
[GitHub] flink pull request #6241: [backport][FLINK-9289][rest] Rework JobSubmitHandl...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/6241 ---
[jira] [Created] (FLINK-9732) Report more detailed error message on SobSubmissionFailure
Chesnay Schepler created FLINK-9732: --- Summary: Report more detailed error message on SobSubmissionFailure Key: FLINK-9732 URL: https://issues.apache.org/jira/browse/FLINK-9732 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.5.0, 1.6.0 Reporter: Chesnay Schepler Currently, if the job submission through the {{JobSubmitHandler}} fails the error message returned tot he client only says "Job submission failed.". As outlined in the discussion in this [PR|https://github.com/apache/flink/pull/6222] we should try to include more information about the actual failure cause. The proposed solution is to encode the cause for the failure in the {{Acknowledge}} that is returned by {{DispatcherGateway#submitJob}}. {code} public class AckOrException { // holds exception, could also be a series of nullable fields private final SuperEither exception; ... public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC; } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9280) Extend JobSubmitHandler to accept jar files
[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9280. --- Resolution: Fixed master: a25cd3feddd19e75456db32a704ee5509e85dd47 1.5: 797709cb2466610b1d5b05c12e43d3f7d4f70183 > Extend JobSubmitHandler to accept jar files > --- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8785. --- Resolution: Fixed Fix Version/s: 1.5.1 1.6.0 master: 81d135578c026842e6d8bc95391da60886612166 1.5: 06b9bf13197712f3c6e81b380386b78dc5979de0 > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6, pull-request-available > Fix For: 1.6.0, 1.5.1 > > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9301) NotSoMiniClusterIterations job fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9301. --- Resolution: Fixed Fix Version/s: (was: 1.5.1) 1.6.0 master: 885640f781aa66359d929eb387f27a6024d75025 > NotSoMiniClusterIterations job fails on travis > -- > > Key: FLINK-9301 > URL: https://issues.apache.org/jira/browse/FLINK-9301 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > The high-parallelism-iterations-test fails on travis. After starting ~55 > taskmanagers all memory is used and further memory allocations fail. > I'm currently letting it run another time, if it fails again I will disable > the test temporarily. > https://travis-ci.org/zentol/flink-ci/builds/375189790 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
Elias Levy created FLINK-9731: - Summary: Kafka source subtask begins to consume from earliest offset Key: FLINK-9731 URL: https://issues.apache.org/jira/browse/FLINK-9731 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Elias Levy On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally. Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs show no indication of anything amiss. There were no errors in the or Kafka related messages in the Flink logs. The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config. Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper. {noformat} [2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32] to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} The broker immediately reconnected to after a few tries ZK: {noformat} [2018-06-30 09:34:55,462] INFO Opening socket connection to server 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,463] INFO Socket connection established to 10.210.48.187/10.210.48.187:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper) [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,468] INFO Opening socket connection to server 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,468] INFO Socket connection established to 10.210.43.200/10.210.43.200:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO Session establishment complete on server 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses: EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) {noformat} By 9:35:02 partitions had returned to the broker. It appears this
[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531822#comment-16531822 ] ASF GitHub Bot commented on FLINK-9280: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6203 > Extend JobSubmitHandler to accept jar files > --- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6203 ---
[jira] [Commented] (FLINK-9301) NotSoMiniClusterIterations job fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531821#comment-16531821 ] ASF GitHub Bot commented on FLINK-9301: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6024 > NotSoMiniClusterIterations job fails on travis > -- > > Key: FLINK-9301 > URL: https://issues.apache.org/jira/browse/FLINK-9301 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.1 > > > The high-parallelism-iterations-test fails on travis. After starting ~55 > taskmanagers all memory is used and further memory allocations fail. > I'm currently letting it run another time, if it fails again I will disable > the test temporarily. > https://travis-ci.org/zentol/flink-ci/builds/375189790 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9301) NotSoMiniClusterIterations job fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9301: -- Labels: pull-request-available (was: ) > NotSoMiniClusterIterations job fails on travis > -- > > Key: FLINK-9301 > URL: https://issues.apache.org/jira/browse/FLINK-9301 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.1 > > > The high-parallelism-iterations-test fails on travis. After starting ~55 > taskmanagers all memory is used and further memory allocations fail. > I'm currently letting it run another time, if it fails again I will disable > the test temporarily. > https://travis-ci.org/zentol/flink-ci/builds/375189790 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6024: [FLINK-9301] [e2e test] Add back "not so mini clus...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6024 ---
[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531816#comment-16531816 ] ASF GitHub Bot commented on FLINK-9666: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6230 wait. Is this a _backport_ of #6212 for 1.5, or an _extension_ that fixes another occurrence? > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6230 wait. Is this a _backport_ of #6212 for 1.5, or an _extension_ that fixes another occurrence? ---
[jira] [Commented] (FLINK-9730) avoid access static via class reference
[ https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531794#comment-16531794 ] ASF GitHub Bot commented on FLINK-9730: --- GitHub user lamber-ken opened a pull request: https://github.com/apache/flink/pull/6247 [FLINK-9730] [code refactor] fix access static via class reference ## What is the purpose of the change - fix access static via class reference ## Brief change log - fix access static via class reference ## Verifying this change - fix access static via class reference ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lamber-ken/flink FLINK-9730 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6247.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6247 commit d8278659cf6373aaa4d71320ad09d540955d396f Author: lamber-ken Date: 2018-07-03T18:36:43Z [code refactore] access static via class reference > avoid access static via class reference > --- > > Key: FLINK-9730 > URL: https://issues.apache.org/jira/browse/FLINK-9730 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > [code refactor] access static via class reference -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9730) avoid access static via class reference
[ https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9730: -- Labels: pull-request-available (was: ) > avoid access static via class reference > --- > > Key: FLINK-9730 > URL: https://issues.apache.org/jira/browse/FLINK-9730 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > [code refactor] access static via class reference -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9730) avoid access static via class reference
[ https://issues.apache.org/jira/browse/FLINK-9730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-9730: -- Description: [code refactor] access static via class reference (was: [code refactore] access static via class reference) > avoid access static via class reference > --- > > Key: FLINK-9730 > URL: https://issues.apache.org/jira/browse/FLINK-9730 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > [code refactor] access static via class reference -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6247: [FLINK-9730] [code refactor] fix access static via...
GitHub user lamber-ken opened a pull request: https://github.com/apache/flink/pull/6247 [FLINK-9730] [code refactor] fix access static via class reference ## What is the purpose of the change - fix access static via class reference ## Brief change log - fix access static via class reference ## Verifying this change - fix access static via class reference ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lamber-ken/flink FLINK-9730 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6247.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6247 commit d8278659cf6373aaa4d71320ad09d540955d396f Author: lamber-ken Date: 2018-07-03T18:36:43Z [code refactore] access static via class reference ---
[jira] [Commented] (FLINK-9641) Pulsar Source Connector
[ https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531777#comment-16531777 ] ASF GitHub Bot commented on FLINK-9641: --- Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909775 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +/** + * Base class for pulsar sources. + * @param --- End diff -- will add a comment > Pulsar Source Connector > --- > > Key: FLINK-9641 > URL: https://issues.apache.org/jira/browse/FLINK-9641 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Chris Kellogg >Priority: Minor > Labels: pull-request-available > > Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub > messaging system currently in apache incubation. It is a very active project > and there are committers from various companies and good adoption. This pr > will add a source function to allow Flink jobs to process messages from > Pulsar topics. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9641) Pulsar Source Connector
[ https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531778#comment-16531778 ] ASF GitHub Bot commented on FLINK-9641: --- Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909793 --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java --- @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; + +/** + * Tests for the PulsarConsumerSource. The source supports two operation modes. + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in + *{@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.. + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after + * after it receives x number of messages. + * + * This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the + * case. The MessageId is used to uniquely identify messages. + */ +public class PulsarConsumerSourceTests { + + private PulsarConsumerSource source; + + private TestConsumer consumer; + + private TestSourceContext context; + + private Thread sourceThread; + + private Exception exception; --- End diff -- will fix > Pulsar Source Connector > --- > > Key: FLINK-9641 > URL: https://issues.apache.org/jira/browse/FLINK-9641 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Chris Kellogg >Priority: Minor > Labels: pull-request-available > > Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub > messaging system currently in apache incubation. It is a very active project > and there are committers from various companies and good adoption. This
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909793 --- Diff: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java --- @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; + +/** + * Tests for the PulsarConsumerSource. The source supports two operation modes. + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in + *{@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.. + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after + * after it receives x number of messages. + * + * This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the + * case. The MessageId is used to uniquely identify messages. + */ +public class PulsarConsumerSourceTests { + + private PulsarConsumerSource source; + + private TestConsumer consumer; + + private TestSourceContext context; + + private Thread sourceThread; + + private Exception exception; --- End diff -- will fix ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909775 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +/** + * Base class for pulsar sources. + * @param --- End diff -- will add a comment ---
[jira] [Commented] (FLINK-9641) Pulsar Source Connector
[ https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531774#comment-16531774 ] ASF GitHub Bot commented on FLINK-9641: --- Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909617 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); +
[jira] [Commented] (FLINK-9641) Pulsar Source Connector
[ https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531775#comment-16531775 ] ASF GitHub Bot commented on FLINK-9641: --- Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909645 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * Default values for Pulsar connectors. + */ +public class Defaults { --- End diff -- will remove. > Pulsar Source Connector > --- > > Key: FLINK-9641 > URL: https://issues.apache.org/jira/browse/FLINK-9641 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Chris Kellogg >Priority: Minor > Labels: pull-request-available > > Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub > messaging system currently in apache incubation. It is a very active project > and there are committers from various companies and good adoption. This pr > will add a source function to allow Flink jobs to process messages from > Pulsar topics. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909645 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * Default values for Pulsar connectors. + */ +public class Defaults { --- End diff -- will remove. ---
[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...
Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r199909617 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return;
[jira] [Created] (FLINK-9730) avoid access static via class reference
lamber-ken created FLINK-9730: - Summary: avoid access static via class reference Key: FLINK-9730 URL: https://issues.apache.org/jira/browse/FLINK-9730 Project: Flink Issue Type: Improvement Affects Versions: 1.5.0 Reporter: lamber-ken Assignee: lamber-ken Fix For: 1.6.0 [code refactore] access static via class reference -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531734#comment-16531734 ] ASF GitHub Bot commented on FLINK-9666: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @zentol, thanks for review. by the way, what's the best way to backport refactorings and code-cleanups? > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6230: [FLINK-9666] short-circuit logic should be used in boolea...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6230 @zentol, thanks for review. by the way, what's the best way to backport refactorings and code-cleanups? ---
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531726#comment-16531726 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894398 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { + /** Whether target type is immutable. */ + final boolean immutableTargetType; + + /** Whether target type and its fields are immutable. */ + final boolean immutable; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + final boolean stateful; + + final int hashCode; --- End diff -- I wonder if this should be `transient` in a serializable class, the hash code could be based on object identity. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894398 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { + /** Whether target type is immutable. */ + final boolean immutableTargetType; + + /** Whether target type and its fields are immutable. */ + final boolean immutable; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + final boolean stateful; + + final int hashCode; --- End diff -- I wonder if this should be `transient` in a serializable class, the hash code could be based on object identity. ---