[jira] [Assigned] (FLINK-8300) Make JobExecutionResult accessible in per-job cluster mode
[ https://issues.apache.org/jira/browse/FLINK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-8300: --- Assignee: (was: Gary Yao) Summary: Make JobExecutionResult accessible in per-job cluster mode (was: Make JobExecutionResult accessible from JobMaster) > Make JobExecutionResult accessible in per-job cluster mode > -- > > Key: FLINK-8300 > URL: https://issues.apache.org/jira/browse/FLINK-8300 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{JobMaster}} after the Job has finished. The cache should have a > configurable size and should periodically clean up stale entries in order to > avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry
[ https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299644#comment-16299644 ] ASF GitHub Bot commented on FLINK-8226: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5141 @dawidwys Could you help to take a look at this PR? This is a bug fix and the issue can be easily reproduced with the test case included in the PR. Thanks a lot. > Dangling reference generated after NFA clean up timed out SharedBufferEntry > --- > > Key: FLINK-8226 > URL: https://issues.apache.org/jira/browse/FLINK-8226 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5141: [FLINK-8226] [cep] Dangling reference generated after NFA...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5141 @dawidwys Could you help to take a look at this PR? This is a bug fix and the issue can be easily reproduced with the test case included in the PR. Thanks a lot. ---
[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5142 @dawidwys @StephanEwen Sorry for late response. For question 1 and 2, I have the same thought with @dawidwys and have updated the PR accordingly. For question 3, I think `int` is enough as we currently store `SharedBufferEntry` in a `HashMap` for each `SharedBufferPage`, and the size of `HashMap` is `int`. If we want to support `long`, we should also change `HashMap` to something else. What's your thought? ---
[jira] [Commented] (FLINK-8227) Optimize the performance of SharedBufferSerializer
[ https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299640#comment-16299640 ] ASF GitHub Bot commented on FLINK-8227: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5142 @dawidwys @StephanEwen Sorry for late response. For question 1 and 2, I have the same thought with @dawidwys and have updated the PR accordingly. For question 3, I think `int` is enough as we currently store `SharedBufferEntry` in a `HashMap` for each `SharedBufferPage`, and the size of `HashMap` is `int`. If we want to support `long`, we should also change `HashMap` to something else. What's your thought? > Optimize the performance of SharedBufferSerializer > -- > > Key: FLINK-8227 > URL: https://issues.apache.org/jira/browse/FLINK-8227 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and > put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But > we obverse that in some cases the calculation of hashCode may become the > bottleneck. The performance will decrease as the number of > {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of > {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about > {{N * N}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158194237 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I think we can't register the metric in `FlinkKinesisConsumer`, since we need it to be associated with a particular shard id. But I could do it from the `KinesisDataFetcher` instead, which already has access to the runtime context. ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299527#comment-16299527 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158194237 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I think we can't register the metric in `FlinkKinesisConsumer`, since we need it to be associated with a particular shard id. But I could do it from the `KinesisDataFetcher` instead, which already has access to the runtime context. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299514#comment-16299514 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158192923 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- I'm OK changing it. That's actually just a copy&past from Amazon docs. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158192923 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- I'm OK changing it. That's actually just a copy&past from Amazon docs. ---
[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort
[ https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8037: -- Description: {code} public Set generateIdsToAbort() { Set idsToAbort = new HashSet<>(); for (int i = 0; i < safeScaleDownFactor; i++) { idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks)); {code} The operands are integers where generateIdsToUse() expects long parameter. was: {code} public Set generateIdsToAbort() { Set idsToAbort = new HashSet<>(); for (int i = 0; i < safeScaleDownFactor; i++) { idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks)); {code} The operands are integers where generateIdsToUse() expects long parameter. > Missing cast in integer arithmetic in > TransactionalIdsGenerator#generateIdsToAbort > -- > > Key: FLINK-8037 > URL: https://issues.apache.org/jira/browse/FLINK-8037 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > public Set generateIdsToAbort() { > Set idsToAbort = new HashSet<>(); > for (int i = 0; i < safeScaleDownFactor; i++) { > idsToAbort.addAll(generateIdsToUse(i * poolSize * > totalNumberOfSubtasks)); > {code} > The operands are integers where generateIdsToUse() expects long parameter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299275#comment-16299275 ] ASF GitHub Bot commented on FLINK-8271: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158164454 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- To my understanding, switching from setUserAgent to withUserAgentPrefix will result in different user agent strings, right? Though I think internally the setUserAgent method is directly forwarding to withUserAgentPrefix anyways ... No objection on this, just curious. > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299274#comment-16299274 ] ASF GitHub Bot commented on FLINK-8271: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158166090 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider - AmazonKinesisClient client = new AmazonKinesisClient( - AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(AWSUtil.getCredentialsProvider(configProps)) + .withClientConfiguration(awsClientConfig) + .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION; if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), + configProps.getProperty(AWSConfigConstants.AWS_REGION))); --- End diff -- Why does the endpoint configuration have a region now? For example, lets say a user wants to test the connector against a local Kinesis mock service at "localhost:". The user also originally was issuing against the regular AWS Kinesis service, at region "us-west-1". The users properties would be like - ``` configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1"); configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:"); ``` In the past, this would correctly redirect requests to "localhost:". With this change, is this also the case? Or do we actually need to call `new AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)
[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158166090 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider - AmazonKinesisClient client = new AmazonKinesisClient( - AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(AWSUtil.getCredentialsProvider(configProps)) + .withClientConfiguration(awsClientConfig) + .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION; if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), + configProps.getProperty(AWSConfigConstants.AWS_REGION))); --- End diff -- Why does the endpoint configuration have a region now? For example, lets say a user wants to test the connector against a local Kinesis mock service at "localhost:". The user also originally was issuing against the regular AWS Kinesis service, at region "us-west-1". The users properties would be like - ``` configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1"); configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:"); ``` In the past, this would correctly redirect requests to "localhost:". With this change, is this also the case? Or do we actually need to call `new AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), null)` instead (do not provide region in endpoint)? ---
[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158164454 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- To my understanding, switching from setUserAgent to withUserAgentPrefix will result in different user agent strings, right? Though I think internally the setUserAgent method is directly forwarding to withUserAgentPrefix anyways ... No objection on this, just curious. ---
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299259#comment-16299259 ] ASF GitHub Bot commented on FLINK-8271: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158163793 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- To my understanding, switching from `setUserAgent` to `withUserAgentPrefix` will result in different user agent strings, right? Though I think internally the `setUserAgent` method is directly forwarding to `withUserAgentPrefix` anyways ... No objection on this, just curious. > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r158163793 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- To my understanding, switching from `setUserAgent` to `withUserAgentPrefix` will result in different user agent strings, right? Though I think internally the `setUserAgent` method is directly forwarding to `withUserAgentPrefix` anyways ... No objection on this, just curious. ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299238#comment-16299238 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161267 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Would be best if @zentol also comments on this. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158160866 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Do we really need this group? The metric is already bounded to the current subtask, which should provide enough context that it is Kinesis-related since we're the Kinesis consumer, no? ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299236#comment-16299236 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158159640 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- Just a matter of preference here: I prefer the term "head of the stream" instead of tip. You can ignore this if you disagree. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299240#comment-16299240 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161564 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I feel that passing the `StreamingRuntimeContext` all the way here just to register metrics, is not a good idea. Is it possible we register the metrics in `FlinkKinesisConsumer` instead? That also makes it more visible what metrics the consumer exposes without having to dig all the way to this internal `ShardConsumer` thread. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299237#comment-16299237 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158160866 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Do we really need this group? The metric is already bounded to the current subtask, which should provide enough context that it is Kinesis-related since we're the Kinesis consumer, no? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158159640 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- Just a matter of preference here: I prefer the term "head of the stream" instead of tip. You can ignore this if you disagree. ---
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161267 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Would be best if @zentol also comments on this. ---
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161564 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I feel that passing the `StreamingRuntimeContext` all the way here just to register metrics, is not a good idea. Is it possible we register the metrics in `FlinkKinesisConsumer` instead? That also makes it more visible what metrics the consumer exposes without having to dig all the way to this internal `ShardConsumer` thread. ---
[jira] [Commented] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299208#comment-16299208 ] Ted Yu commented on FLINK-7795: --- https://github.com/google/error-prone/releases/tag/v2.1.3 was the latest release. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. > Here are the dependencies: > {code} > > com.google.errorprone > error_prone_annotation > ${error-prone.version} > provided > > > > com.google.auto.service > auto-service > 1.0-rc3 > true > > > com.google.errorprone > error_prone_check_api > ${error-prone.version} > provided > > > com.google.code.findbugs > jsr305 > > > > > com.google.errorprone > javac > 9-dev-r4023-3 > provided > > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7525) Add config option to disable Cancel functionality on UI
[ https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299205#comment-16299205 ] Ted Yu commented on FLINK-7525: --- What should be the next step ? > Add config option to disable Cancel functionality on UI > --- > > Key: FLINK-7525 > URL: https://issues.apache.org/jira/browse/FLINK-7525 > Project: Flink > Issue Type: Improvement > Components: Web Client, Webfrontend >Reporter: Ted Yu > > In this email thread > http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI > , Raja was asking for a way to control how users cancel Job(s). > Robert proposed adding a config option which disables the Cancel > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5191: Release 1.4
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5191 This seems like a mistakenly opened PR. Can you please close this, @czhxmz? Thanks! ---
[GitHub] flink issue #5187: Merge pull request #1 from apache/master
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5187 This seems like a mistake. @laolang113 can you please close this PR? Thanks! ---
[GitHub] flink issue #5195: [hotfix] [build] Always include Kafka 0.11 connector
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5195 +1, LGTM ---
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299058#comment-16299058 ] Tzu-Li (Gordon) Tai commented on FLINK-8283: I'm suspecting that the Travis infra updates on Dec. 12th is somehow causing this: https://blog.travis-ci.com/2017-12-12-new-trusty-images-q4-launch. It seems like the 10min no-output started since our commits after Dec. 12th (as far as I can tell from our build history). > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='
[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]
[ https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298961#comment-16298961 ] ASF GitHub Bot commented on FLINK-8297: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5185 This might be a great chance to start discussing how ListState APIs should evolve - specifically, is it time to consider adding `remove()` to ListState? > RocksDBListState stores whole list in single byte[] > --- > > Key: FLINK-8297 > URL: https://issues.apache.org/jira/browse/FLINK-8297 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0, 1.3.2 >Reporter: Jan Lukavský > > RocksDBListState currently keeps whole list of data in single RocksDB > key-value pair, which implies that the list actually must fit into memory. > Larger lists are not supported and end up with OOME or other error. The > RocksDBListState could be modified so that individual items in list are > stored in separate keys in RocksDB and can then be iterated over. A simple > implementation could reuse existing RocksDBMapState, with key as index to the > list and a single RocksDBValueState keeping track of how many items has > already been added to the list. Because this implementation might be less > efficient in come cases, it would be good to make it opt-in by a construct > like > {{new RocksDBStateBackend().enableLargeListsPerKey()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapSta...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5185 This might be a great chance to start discussing how ListState APIs should evolve - specifically, is it time to consider adding `remove()` to ListState? ---
[GitHub] flink pull request #5195: [hotfix] [build] Always include Kafka 0.11 connect...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5195 [hotfix] [build] Always include Kafka 0.11 connector Now that Flink only supports builds for Scala 2.11+ we can unconditionally enable the Kafka 0.11 connector. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 20171220a_always_include_kafka_0.11_connector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5195 commit c4bdbbcb95344101a61c6864d7f1c4feca1f0cb3 Author: Greg Hogan Date: 2017-12-20T17:11:00Z [hotfix] [build] Always include Kafka 0.11 connector Now that Flink only supports builds for Scala 2.11+ we can unconditionally enable the Kafka 0.11 connector. ---
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298835#comment-16298835 ] ASF GitHub Bot commented on FLINK-8271: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5171 You are right. Thank you, Gordon. I updated the PR title > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade deprecated method...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5171 You are right. Thank you, Gordon. I updated the PR title ---
[jira] [Comment Edited] (FLINK-7588) Document RocksDB tuning for spinning disks
[ https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258309#comment-16258309 ] Ted Yu edited comment on FLINK-7588 at 12/20/17 6:02 PM: - bq. Be careful about whether you have enough memory to keep all bloom filters Other than the above being tricky, the other guidelines are actionable . was (Author: yuzhih...@gmail.com): bq. Be careful about whether you have enough memory to keep all bloom filters Other than the above being tricky, the other guidelines are actionable. > Document RocksDB tuning for spinning disks > -- > > Key: FLINK-7588 > URL: https://issues.apache.org/jira/browse/FLINK-7588 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ted Yu > > In docs/ops/state/large_state_tuning.md , it was mentioned that: > bq. the default configuration is tailored towards SSDs and performs > suboptimal on spinning disks > We should add recommendation targeting spinning disks: > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask
[ https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7897: -- Description: nio.Files#delete() provides better clue as to why the deletion may fail: https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) Depending on the potential exception (FileNotFound), the call to localFile.exists() may be skipped. was: nio.Files#delete() provides better clue as to why the deletion may fail: https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) Depending on the potential exception, the call to localFile.exists() may be skipped. > Consider using nio.Files for file deletion in TransientBlobCleanupTask > -- > > Key: FLINK-7897 > URL: https://issues.apache.org/jira/browse/FLINK-7897 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > nio.Files#delete() provides better clue as to why the deletion may fail: > https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) > Depending on the potential exception (FileNotFound), the call to > localFile.exists() may be skipped. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8295. - Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 Fixed in 1.5: 1a98e327ea504f1422935c12a3342997145b9292 Fixed in 1.4: 7e497f744a67c8011a8e1f353eddc4f1d514 > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > Fix For: 1.5.0, 1.4.1 > > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298771#comment-16298771 ] ASF GitHub Bot commented on FLINK-8295: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5183 > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5183: [FLINK-8295][cassandra][build] properly shade nett...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5183 ---
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298765#comment-16298765 ] ASF GitHub Bot commented on FLINK-8295: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5183 I tested it locally. Will merge this now... > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5183 I tested it locally. Will merge this now... ---
[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298752#comment-16298752 ] ASF GitHub Bot commented on FLINK-8233: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158081176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSer
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158081176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parseAccumulatorResults(p, ctxt); +
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158078273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parseAccumulatorResults(p, ctxt); --- End diff -- Actual
[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298732#comment-16298732 ] ASF GitHub Bot commented on FLINK-8233: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158078273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSer
[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298723#comment-16298723 ] Timo Walther commented on FLINK-8281: - Sorry, I haven't seen that this issue is already resolved. > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > Fix For: 1.4.0 > > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 common frames omitted > Caused by: java.io.IOException: Could not open output stream for state > backend >
[jira] [Updated] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8234: Description: In order to serve the {{JobExecutionResults}} we have to cache them in the {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should have a configurable size and should periodically clean up stale entries in order to avoid memory exhaustion. (was: In order to serve the {{JobExecutionResults}} we have to cache them in the {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should have a configurable size and should periodically clean up stale entries in order to avoid memory leaks.) > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory exhaustion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298716#comment-16298716 ] Timo Walther commented on FLINK-8281: - [~brucewoo] can you tell us a bit more about your environment? are you using Flink with bundled Hadoop? How do you dependecies look like? Can you post your project's pom.xml? > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > Fix For: 1.4.0 > > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 co
[jira] [Updated] (FLINK-8300) Make JobExecutionResult accessible from JobMaster
[ https://issues.apache.org/jira/browse/FLINK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8300: Description: In order to serve the {{JobExecutionResults}} we have to cache them in the {{JobMaster}} after the Job has finished. The cache should have a configurable size and should periodically clean up stale entries in order to avoid memory leaks. (was: In order to serve the {{JobExecutionResults}} we have to cache them in the {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should have a configurable size and should periodically clean up stale entries in order to avoid memory leaks.) > Make JobExecutionResult accessible from JobMaster > - > > Key: FLINK-8300 > URL: https://issues.apache.org/jira/browse/FLINK-8300 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{JobMaster}} after the Job has finished. The cache should have a > configurable size and should periodically clean up stale entries in order to > avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8300) Make JobExecutionResult accessible from JobMaster
Gary Yao created FLINK-8300: --- Summary: Make JobExecutionResult accessible from JobMaster Key: FLINK-8300 URL: https://issues.apache.org/jira/browse/FLINK-8300 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Gary Yao Assignee: Gary Yao Priority: Blocker Fix For: 1.5.0 In order to serve the {{JobExecutionResults}} we have to cache them in the {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should have a configurable size and should periodically clean up stale entries in order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8292) Remove unnecessary force cast in DataStreamSource
[ https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298658#comment-16298658 ] ASF GitHub Bot commented on FLINK-8292: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5180 +0 > Remove unnecessary force cast in DataStreamSource > - > > Key: FLINK-8292 > URL: https://issues.apache.org/jira/browse/FLINK-8292 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Matrix42 >Priority: Trivial > Fix For: 1.5.0, 1.4.1 > > > In DataStreamSource there is a cast can be replaced by retuen `this` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5180: [FLINK-8292] Remove unnecessary force cast in DataStreamS...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5180 +0 ---
[jira] [Updated] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8233: Description: Expose {{JobExecutionResult}} from a finished Flink job via HTTP: * Add a new AbstractRestHandler that returns the information in {{JobExecutionResult}}. * Register new handler in {{WebMonitorEndpoint}}. was:Retrieve the {{ExecutionResult}} from a finished Flink job via the {{RestClusterClient}}. > Expose JobExecutionResult via HTTP > -- > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Expose {{JobExecutionResult}} from a finished Flink job via HTTP: > * Add a new AbstractRestHandler that returns the information in > {{JobExecutionResult}}. > * Register new handler in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8233: Summary: Expose JobExecutionResult via HTTP (was: Retrieve ExecutionResult by REST polling) > Expose JobExecutionResult via HTTP > -- > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8299) Retrieve ExecutionResult by REST polling
Gary Yao created FLINK-8299: --- Summary: Retrieve ExecutionResult by REST polling Key: FLINK-8299 URL: https://issues.apache.org/jira/browse/FLINK-8299 Project: Flink Issue Type: Sub-task Components: REST Affects Versions: 1.5.0 Reporter: Gary Yao Assignee: Gary Yao Priority: Blocker Fix For: 1.5.0 Retrieve the {{ExecutionResult}} from a finished Flink job via the {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8280) Enable checkstyle for org.apache.flink.runtime.blob
[ https://issues.apache.org/jira/browse/FLINK-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298650#comment-16298650 ] ASF GitHub Bot commented on FLINK-8280: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5175#discussion_r158056059 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -150,8 +150,7 @@ static File initLocalStorageDirectory(String basePath) throws IOException { File storageDir; // NOTE: although we will be using UUIDs, there may be collisions - final int MAX_ATTEMPTS = 10; - for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + for (int attempt = 0; attempt < 10; attempt++) { --- End diff -- Should we keep and simply rename the constant? > Enable checkstyle for org.apache.flink.runtime.blob > --- > > Key: FLINK-8280 > URL: https://issues.apache.org/jira/browse/FLINK-8280 > Project: Flink > Issue Type: Improvement > Components: Checkstyle >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5175: [FLINK-8280][checkstyle] fix checkstyle in BlobSer...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5175#discussion_r158056059 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -150,8 +150,7 @@ static File initLocalStorageDirectory(String basePath) throws IOException { File storageDir; // NOTE: although we will be using UUIDs, there may be collisions - final int MAX_ATTEMPTS = 10; - for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + for (int attempt = 0; attempt < 10; attempt++) { --- End diff -- Should we keep and simply rename the constant? ---
[jira] [Created] (FLINK-8298) Shutdown MockEnvironment
Piotr Nowojski created FLINK-8298: - Summary: Shutdown MockEnvironment Key: FLINK-8298 URL: https://issues.apache.org/jira/browse/FLINK-8298 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski Fix For: 1.5.0 IOManager inside MockEnvironment is not being shutdown properly in tests causing a memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8079) Skip remaining E2E tests if one failed
[ https://issues.apache.org/jira/browse/FLINK-8079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298598#comment-16298598 ] ASF GitHub Bot commented on FLINK-8079: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5156#discussion_r158037581 --- Diff: tools/travis_mvn_watchdog.sh --- @@ -543,35 +543,45 @@ case $TEST in printf "Running end-to-end tests\n" printf "==\n" - printf "\n==\n" - printf "Running Wordcount end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_batch_wordcount.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Kafka end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_streaming_kafka010.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running class loading end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_streaming_classloader.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Shaded Hadoop S3A end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_shaded_hadoop_s3a.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Shaded Presto S3 end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_shaded_presto_s3.sh - EXIT_CODE=$(($EXIT_CODE+$?)) + if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Wordcount end-to-end test\n" + printf "==\n" + FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_batch_wordcount.sh + EXIT_CODE=$(($EXIT_CODE+$?)) --- End diff -- If `$EXIT_CODE == 0` then why not simply set `EXIT_CODE=$?`? > Skip remaining E2E tests if one failed > -- > > Key: FLINK-8079 > URL: https://issues.apache.org/jira/browse/FLINK-8079 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0, 1.4.1 > > > I propose that if one end-to-end tests fails the remaining tests are skipped. > [~aljoscha] What do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5156: [FLINK-8079][tests] Stop end-to-end test execution...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5156#discussion_r158037581 --- Diff: tools/travis_mvn_watchdog.sh --- @@ -543,35 +543,45 @@ case $TEST in printf "Running end-to-end tests\n" printf "==\n" - printf "\n==\n" - printf "Running Wordcount end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_batch_wordcount.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Kafka end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_streaming_kafka010.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running class loading end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_streaming_classloader.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Shaded Hadoop S3A end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_shaded_hadoop_s3a.sh - EXIT_CODE=$(($EXIT_CODE+$?)) - - printf "\n==\n" - printf "Running Shaded Presto S3 end-to-end test\n" - printf "==\n" - FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_shaded_presto_s3.sh - EXIT_CODE=$(($EXIT_CODE+$?)) + if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Wordcount end-to-end test\n" + printf "==\n" + FLINK_DIR=build-target CLUSTER_MODE=cluster test-infra/end-to-end-test/test_batch_wordcount.sh + EXIT_CODE=$(($EXIT_CODE+$?)) --- End diff -- If `$EXIT_CODE == 0` then why not simply set `EXIT_CODE=$?`? ---
[jira] [Commented] (FLINK-8222) Update Scala version
[ https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298583#comment-16298583 ] ASF GitHub Bot commented on FLINK-8222: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5136 I'll merge this to 1.5 only. > Update Scala version > > > Key: FLINK-8222 > URL: https://issues.apache.org/jira/browse/FLINK-8222 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Update Scala to version {{2.11.12}}. I don't believe this affects the Flink > distribution but rather anyone who is compiling Flink or a > Flink-quickstart-derived program on a shared system. > "A privilege escalation vulnerability (CVE-2017-15288) has been identified in > the Scala compilation daemon." > https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5136: [FLINK-8222] [build] Update Scala version
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5136 I'll merge this to 1.5 only. ---
[jira] [Updated] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5506: -- Fix Version/s: 1.4.1 1.5.0 > Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException > - > > Key: FLINK-5506 > URL: https://issues.apache.org/jira/browse/FLINK-5506 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.4, 1.3.2, 1.4.1 >Reporter: Miguel E. Coimbra >Assignee: Greg Hogan > Labels: easyfix, newbie > Fix For: 1.5.0, 1.4.1 > > Original Estimate: 2h > Remaining Estimate: 2h > > Reporting this here as per Vasia's advice. > I am having the following problem while trying out the > org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API > (Java). > Specs: JDK 1.8.0_102 x64 > Apache Flink: 1.1.4 > Suppose I have a very small (I tried an example with 38 vertices as well) > dataset stored in a tab-separated file 3-vertex.tsv: > {code} > #id1 id2 score > 010 > 020 > 030 > {code} > This is just a central vertex with 3 neighbors (disconnected between > themselves). > I am loading the dataset and executing the algorithm with the following code: > {code} > // Load the data from the .tsv file. > final DataSet> edgeTuples = > env.readCsvFile(inputPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class, Double.class); > // Generate a graph and add reverse edges (undirected). > final Graph graph = Graph.fromTupleDataSet(edgeTuples, > new MapFunction() { > private static final long serialVersionUID = 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env).getUndirected(); > // CommunityDetection parameters. > final double hopAttenuationDelta = 0.5d; > final int iterationCount = 10; > // Prepare and trigger the execution. > DataSet> vs = graph.run(new > org.apache.flink.graph.library.CommunityDetection(iterationCount, > hopAttenuationDelta)).getVertices(); > vs.print(); > {code} > ​Running this code throws the following exception​ (check the bold line): > {code} > ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158) > at > org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) > at java.lang.Thread.run(Thread.java:745)​ > {code} > ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line > in bold: > org.apache.flink.graph.library.CommunityDetection.java (source code accessed > automatically by Maven) > // find the highest score of maxScoreLabel > double highestScore
[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298563#comment-16298563 ] ASF GitHub Bot commented on FLINK-5506: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5126 @StephanEwen thanks for the tip. I'll remove the added `TypeHint` methods and commit to 1.4 and 1.5. > Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException > - > > Key: FLINK-5506 > URL: https://issues.apache.org/jira/browse/FLINK-5506 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.4, 1.3.2, 1.4.1 >Reporter: Miguel E. Coimbra >Assignee: Greg Hogan > Labels: easyfix, newbie > Original Estimate: 2h > Remaining Estimate: 2h > > Reporting this here as per Vasia's advice. > I am having the following problem while trying out the > org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API > (Java). > Specs: JDK 1.8.0_102 x64 > Apache Flink: 1.1.4 > Suppose I have a very small (I tried an example with 38 vertices as well) > dataset stored in a tab-separated file 3-vertex.tsv: > {code} > #id1 id2 score > 010 > 020 > 030 > {code} > This is just a central vertex with 3 neighbors (disconnected between > themselves). > I am loading the dataset and executing the algorithm with the following code: > {code} > // Load the data from the .tsv file. > final DataSet> edgeTuples = > env.readCsvFile(inputPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class, Double.class); > // Generate a graph and add reverse edges (undirected). > final Graph graph = Graph.fromTupleDataSet(edgeTuples, > new MapFunction() { > private static final long serialVersionUID = 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env).getUndirected(); > // CommunityDetection parameters. > final double hopAttenuationDelta = 0.5d; > final int iterationCount = 10; > // Prepare and trigger the execution. > DataSet> vs = graph.run(new > org.apache.flink.graph.library.CommunityDetection(iterationCount, > hopAttenuationDelta)).getVertices(); > vs.print(); > {code} > ​Running this code throws the following exception​ (check the bold line): > {code} > ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158) > at > org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) > at java.lang.Thread.run(Thread.java:745)​ > {code} > ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line > i
[GitHub] flink issue #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPointerEx...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5126 @StephanEwen thanks for the tip. I'll remove the added `TypeHint` methods and commit to 1.4 and 1.5. ---
[jira] [Commented] (FLINK-8233) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298531#comment-16298531 ] ASF GitHub Bot commented on FLINK-8233: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5194 [FLINK-8233][flip6] Add JobExecutionResultHandler ## What is the purpose of the change *Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. This will be needed so that accumulator results can be transmitted to the client.* This PR is based on #5184. ## Brief change log - *Add `JobExecutionResultHandler` to enable retrieval of `JobExecutionResult`.* - *Add serializer and deserializer for `JobExecutionResult`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new and changed classes.* - *Manually ran the WordCount example job and fetched the `JobExecutionResult` with `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8233-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5194 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5194 [FLINK-8233][flip6] Add JobExecutionResultHandler ## What is the purpose of the change *Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. This will be needed so that accumulator results can be transmitted to the client.* This PR is based on #5184. ## Brief change log - *Add `JobExecutionResultHandler` to enable retrieval of `JobExecutionResult`.* - *Add serializer and deserializer for `JobExecutionResult`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new and changed classes.* - *Manually ran the WordCount example job and fetched the `JobExecutionResult` with `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8233-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5194 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. ---
[jira] [Assigned] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-8268: - Assignee: Piotr Nowojski > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298389#comment-16298389 ] ASF GitHub Bot commented on FLINK-8268: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5193 [FLINK-8268][tests] Improve tests stability This is a walk-around an error reported in the issue: https://issues.apache.org/jira/browse/FLINK-8268 . Instead of writing files to disk, this PR creates a simple in memory "file like" abstraction. Second commit is to further improve stability by cleaning up the resources from `MockEnvironment`. ## Verifying this change This is change in tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8268 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5193 commit 129382c7d3a0d87afd8163e57a48f21819e4134c Author: Piotr Nowojski Date: 2017-12-20T10:23:26Z [FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest stability by using custom in memory storage commit 55d0f4b197a062801c0763b7aeaf5d0bc64eac77 Author: Piotr Nowojski Date: 2017-12-20T11:27:56Z [hotfix][tests] Properly shutdown MockEnvironment to release resources > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5193 [FLINK-8268][tests] Improve tests stability This is a walk-around an error reported in the issue: https://issues.apache.org/jira/browse/FLINK-8268 . Instead of writing files to disk, this PR creates a simple in memory "file like" abstraction. Second commit is to further improve stability by cleaning up the resources from `MockEnvironment`. ## Verifying this change This is change in tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8268 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5193 commit 129382c7d3a0d87afd8163e57a48f21819e4134c Author: Piotr Nowojski Date: 2017-12-20T10:23:26Z [FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest stability by using custom in memory storage commit 55d0f4b197a062801c0763b7aeaf5d0bc64eac77 Author: Piotr Nowojski Date: 2017-12-20T11:27:56Z [hotfix][tests] Properly shutdown MockEnvironment to release resources ---
[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298303#comment-16298303 ] ASF GitHub Bot commented on FLINK-8257: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5192 [FLINK-8257] [conf] Unify the value checks for setParallelism() ## What is the purpose of the change This PR unifies the value checks for `setParallelism()` methods in different components. ## Brief change log flink-java: Updates the error message in `org.apache.flink.api.java.operators.Operator.setParallelism()`. flink-streaming-java: Refines the check in `StreamTransformation.setParallelism()`. Changes the value check to `parallelism != 1` in `DataStreamSource.setParallelism()`. Removes the value checks in `SingleOutputStreamOperator.setParallelism()` and `StreamExecutionEnvironment.setParallelism()`. flink-gelly: Updates the error messages for `setParallelism()` methods in `GraphAnalyticBase`, `IterationConfiguration` and `GraphAlgorithmWrappingBase`. The methods in flink-runtime package were kept unchanged. Since `org.apache.flink.api.common.operators.Operator.setParallelism()` is only used internally, no extra check was provided for it. ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8257 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5192 commit 44681d1a06373c5e9e723bd202ee053aaf95b3e8 Author: Xingcan Cui Date: 2017-12-20T09:11:12Z [FLINK-8257] [conf] Unify the value checks for setParallelism() > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of them also > allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is > {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the > value checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5192: [FLINK-8257] [conf] Unify the value checks for set...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5192 [FLINK-8257] [conf] Unify the value checks for setParallelism() ## What is the purpose of the change This PR unifies the value checks for `setParallelism()` methods in different components. ## Brief change log flink-java: Updates the error message in `org.apache.flink.api.java.operators.Operator.setParallelism()`. flink-streaming-java: Refines the check in `StreamTransformation.setParallelism()`. Changes the value check to `parallelism != 1` in `DataStreamSource.setParallelism()`. Removes the value checks in `SingleOutputStreamOperator.setParallelism()` and `StreamExecutionEnvironment.setParallelism()`. flink-gelly: Updates the error messages for `setParallelism()` methods in `GraphAnalyticBase`, `IterationConfiguration` and `GraphAlgorithmWrappingBase`. The methods in flink-runtime package were kept unchanged. Since `org.apache.flink.api.common.operators.Operator.setParallelism()` is only used internally, no extra check was provided for it. ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8257 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5192 commit 44681d1a06373c5e9e723bd202ee053aaf95b3e8 Author: Xingcan Cui Date: 2017-12-20T09:11:12Z [FLINK-8257] [conf] Unify the value checks for setParallelism() ---
[GitHub] flink pull request #5191: Release 1.4
GitHub user czhxmz opened a pull request: https://github.com/apache/flink/pull/5191 Release 1.4 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/flink release-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5191.patch To close this pu
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298177#comment-16298177 ] ASF GitHub Bot commented on FLINK-8289: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5190 [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine ## What is the purpose of the change This pull request set the rest.address to the host of the Dispatcher or JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 or 127.0.0.0:9067. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8289 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5190 commit 0956b94a910df071a61aae90cac7fef2b795ed0c Author: shuai.xus Date: 2017-12-20T09:37:10Z [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5190 [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine ## What is the purpose of the change This pull request set the rest.address to the host of the Dispatcher or JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 or 127.0.0.0:9067. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8289 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5190 commit 0956b94a910df071a61aae90cac7fef2b795ed0c Author: shuai.xus Date: 2017-12-20T09:37:10Z [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine ---
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298162#comment-16298162 ] ASF GitHub Bot commented on FLINK-8271: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5171 Ah, I see. The `AmazonKinesisClient` class is not deprecated, but the constructors are. Alright, thanks for the clarification. I'll find some time to revisit this PR tomorrow. > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5171 Ah, I see. The `AmazonKinesisClient` class is not deprecated, but the constructors are. Alright, thanks for the clarification. I'll find some time to revisit this PR tomorrow. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298159#comment-16298159 ] cttestid41 commented on FLINK-8234: --- Test comment > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298157#comment-16298157 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 Also FYI: The stalling tests seems to have been fixed (indirectly?) by fixing the mocking issue. No failures have occurred anymore over 10 test local Travis runs. > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partitio
[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 Also FYI: The stalling tests seems to have been fixed (indirectly?) by fixing the mocking issue. No failures have occurred anymore over 10 test local Travis runs. ---
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298155#comment-16298155 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5181 > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775,
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298154#comment-16298154 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 Making the `AbstractFetcher::commitInternalOffsetsToKafka` non-final just for the sake of testing using mocks really is not ideal. I've opened a new PR #5189 that properly solves this by introducing a proper abstraction for offset committing. Closing this PR in favor of the new one. > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623
[GitHub] flink pull request #5181: [FLINK-8283] [kafka] Fix mock verification on fina...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5181 ---
[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 Making the `AbstractFetcher::commitInternalOffsetsToKafka` non-final just for the sake of testing using mocks really is not ideal. I've opened a new PR #5189 that properly solves this by introducing a proper abstraction for offset committing. Closing this PR in favor of the new one. ---
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298151#comment-16298151 ] ASF GitHub Bot commented on FLINK-8283: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5189 [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface ## What is the purpose of the change This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188. The broken `FlinkKafkaConsumerBaseTest` is properly fixed only when both #5188 and this PR is merged. Prior to this PR, offset committing was coupled tightly with the `AbstractFetcher`, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that required mocking the offset commit methods on `AbstractFetcher`, while ideally, it would be best that those methods are made final (thus, unable to be mocked) to prevent accidental overrides. This PR decouples offset committing as a separate service behind a new `KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as an implementation of this service, so that this PR does not introduce any more change other than introducing a new layer of abstraction. Unit tests that verify offset committing behaviour now provide a dummy verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks on AbstractFetcher) and test against that. ## Brief change log - Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the newly introduced `KafkaOffsetCommitter` interface. - Let `AbstractFetcher` implement `KafkaOffsetCommitter` - In the `FlinkKafkaConsumerBase`, let "offset committing" and "record fetching" be logically separated to be handled by two services, i.e. namely a `KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher instance sits behind both service abstractions. - In `FlinkKafkaConsumerBaseTest`, remove all mocks on `AbstractFetcher::commitInternalOffsetsToKafka`, and test against a `KafkaOffsetCommitter` instead. ## Verifying this change This PR does not add any new functionality. Reworked test also do not affect test coverage. `FlinkKafkaConsumerBaseTest` verifies all changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8283-KafkaOffsetCommitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5189 commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. commit 0d19e99d3fb3359f43c2db91611257a5edb2e17f Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T19:55:16Z [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface Prior to this commit, offset committing was coupled tightly with the AbstractFetcher, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that mock the offset commit methods on AbstractFetcher, while ideally, it would be best that those methods are made final to prevent accidental overrides. This commit decouples offset committing as a separate service behind a new KafkaOffsetCommitter interface. For now, the AbstractFetcher is reused as an implementation of this service. Unit tests that verify offset committing behaviour now p
[GitHub] flink pull request #5189: [FLINK-8283] [kafka] Introduce KafkaOffsetCommitte...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5189 [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface ## What is the purpose of the change This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188. The broken `FlinkKafkaConsumerBaseTest` is properly fixed only when both #5188 and this PR is merged. Prior to this PR, offset committing was coupled tightly with the `AbstractFetcher`, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that required mocking the offset commit methods on `AbstractFetcher`, while ideally, it would be best that those methods are made final (thus, unable to be mocked) to prevent accidental overrides. This PR decouples offset committing as a separate service behind a new `KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as an implementation of this service, so that this PR does not introduce any more change other than introducing a new layer of abstraction. Unit tests that verify offset committing behaviour now provide a dummy verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks on AbstractFetcher) and test against that. ## Brief change log - Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the newly introduced `KafkaOffsetCommitter` interface. - Let `AbstractFetcher` implement `KafkaOffsetCommitter` - In the `FlinkKafkaConsumerBase`, let "offset committing" and "record fetching" be logically separated to be handled by two services, i.e. namely a `KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher instance sits behind both service abstractions. - In `FlinkKafkaConsumerBaseTest`, remove all mocks on `AbstractFetcher::commitInternalOffsetsToKafka`, and test against a `KafkaOffsetCommitter` instead. ## Verifying this change This PR does not add any new functionality. Reworked test also do not affect test coverage. `FlinkKafkaConsumerBaseTest` verifies all changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8283-KafkaOffsetCommitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5189 commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. commit 0d19e99d3fb3359f43c2db91611257a5edb2e17f Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T19:55:16Z [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface Prior to this commit, offset committing was coupled tightly with the AbstractFetcher, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that mock the offset commit methods on AbstractFetcher, while ideally, it would be best that those methods are made final to prevent accidental overrides. This commit decouples offset committing as a separate service behind a new KafkaOffsetCommitter interface. For now, the AbstractFetcher is reused as an implementation of this service. Unit tests that verify offset committing behaviour now provide a dummy verifyable implementation of the KafkaOffsetCommitter, instead of using mocks on AbstractFetcher. ---
[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4559 @NicoK , I have submitted the `hotfix` commit to address above comments. ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298149#comment-16298149 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4559 @NicoK , I have submitted the `hotfix` commit to address above comments. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298145#comment-16298145 ] ASF GitHub Bot commented on FLINK-8295: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5183 Thanks for the fix @NicoK. I will have a look at it later today and merge this. > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5183 Thanks for the fix @NicoK. I will have a look at it later today and merge this. ---
[jira] [Commented] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection
[ https://issues.apache.org/jira/browse/FLINK-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298137#comment-16298137 ] ASF GitHub Bot commented on FLINK-8296: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5188 [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection ## What is the purpose of the change Prior to this PR, reflection was mainly used to inject mocks into private fields of the `FlinkKafkaConsumerBase`, without the need to fully execute all operator life cycle methods. This was done using the `FlinkKafkaConsumerBaseTest::getConsumer(...)` method (have been removed by this PR). This, however, caused the unit tests to be too implementation-specific and hard to extend. This PR reworks the `FlinkKafkaConsumerBaseTest` to remove the reflection-based `FlinkKafkaConsumerBaseTest::getConsumer(...)` method. All tests now instantiate the `DummyFlinkKafkaConsumer` normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. ## Brief change log - Remove reflection-relying `FlinkKafkaConsumerBaseTest::getConsumer(...)` method. - Generalize the `DummyFlinkKafkaConsumer` class - Introduce the `FlinkKafkaConsumerBaseTest::setupConsumer(...)` method that iterates through all normal operator life cycle methods. - The test pattern for all unit tests in the `FlinkKafkaConsumerBaseTest` is now: 1) instantiate a `DummyFlinkKafkaConsumer`, and 2) call `setupConsumer(dummyConsumer)` to make sure the consumer goes through all life cycle methods, and instance fields are properly instantiated (instead of relying on reflection like before). ## Verifying this change Test coverage of the reworked `FlinkKafkaConsumerBaseTest` has not been touched. That unit test verifies this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8296 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5188.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5188 commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. > Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency > injection > - > > Key: FLINK-8296 > URL: https://issues.apache.org/jira/browse/FLINK-8296 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java > reflection for dependency injection. Using reflection to compose unit tests > really should be a last resort, and indicates that the tests there are highly > implementation-specific, and that we should make the design more testable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5188: [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBase...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5188 [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection ## What is the purpose of the change Prior to this PR, reflection was mainly used to inject mocks into private fields of the `FlinkKafkaConsumerBase`, without the need to fully execute all operator life cycle methods. This was done using the `FlinkKafkaConsumerBaseTest::getConsumer(...)` method (have been removed by this PR). This, however, caused the unit tests to be too implementation-specific and hard to extend. This PR reworks the `FlinkKafkaConsumerBaseTest` to remove the reflection-based `FlinkKafkaConsumerBaseTest::getConsumer(...)` method. All tests now instantiate the `DummyFlinkKafkaConsumer` normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. ## Brief change log - Remove reflection-relying `FlinkKafkaConsumerBaseTest::getConsumer(...)` method. - Generalize the `DummyFlinkKafkaConsumer` class - Introduce the `FlinkKafkaConsumerBaseTest::setupConsumer(...)` method that iterates through all normal operator life cycle methods. - The test pattern for all unit tests in the `FlinkKafkaConsumerBaseTest` is now: 1) instantiate a `DummyFlinkKafkaConsumer`, and 2) call `setupConsumer(dummyConsumer)` to make sure the consumer goes through all life cycle methods, and instance fields are properly instantiated (instead of relying on reflection like before). ## Verifying this change Test coverage of the reworked `FlinkKafkaConsumerBaseTest` has not been touched. That unit test verifies this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8296 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5188.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5188 commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. ---
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298135#comment-16298135 ] ASF GitHub Bot commented on FLINK-8295: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5183 true - I adapted the comment accordingly > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5183 true - I adapted the comment accordingly ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298092#comment-16298092 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157963387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() { private volatile Throwable runnerException; - private volatile JobExecutionResult result; + private volatile org.apache.flink.runtime.jobmaster.JobExecutionResult result; BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); } @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + this.result = result; jobMastersToWaitFor.countDown(); } @Override - public void jobFailed(Throwable cause) { - jobException = cause; + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + checkArgument(result.getSerializedThrowable().isPresent()); + + jobException = result --- End diff -- Actually it is not needed to store the exception separately because the JobExecutionResult already contains the exception. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157963387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() { private volatile Throwable runnerException; - private volatile JobExecutionResult result; + private volatile org.apache.flink.runtime.jobmaster.JobExecutionResult result; BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); } @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + this.result = result; jobMastersToWaitFor.countDown(); } @Override - public void jobFailed(Throwable cause) { - jobException = cause; + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + checkArgument(result.getSerializedThrowable().isPresent()); + + jobException = result --- End diff -- Actually it is not needed to store the exception separately because the JobExecutionResult already contains the exception. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298081#comment-16298081 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157962431 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157962431 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298065#comment-16298065 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5168 New PR #5184 > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)