[jira] [Created] (FLINK-7597) broken flink-connectors-kinesis setup in Intellij that potentially results from improper pom.xml
Bowen Li created FLINK-7597: --- Summary: broken flink-connectors-kinesis setup in Intellij that potentially results from improper pom.xml Key: FLINK-7597 URL: https://issues.apache.org/jira/browse/FLINK-7597 Project: Flink Issue Type: Bug Components: Kinesis Connector Affects Versions: 1.3.2, 1.4.0 Reporter: Bowen Li Assignee: Bowen Li I use Intellij to develop flink and flink-connectors-kinesis. I imported the whole flink src code into Intellij, and Intellij treats flink-connectors-kinesis as a module. The project structure in intellij looks like this: https://imgur.com/a/uK3Fd Here's the problem: The {{flink-connectors-kinesis}} module always complains about not being able to find dependencies like amazon-kinesis-producer, amazon-kinesis-client, flink-streaming-java_2.11, etc. Seems like Intellij cannot properly parse {{/flink-connectors-kinesis/pom.xml}}. And Intellij always suggest I add those dependencies to {{flink-connectors/pom.xml}}. In short, {{flink-connectors-kinesis}} won't compile in my Intellij until I added those dependencies to {{flink-connectors/pom.xml}}. My {{flink-connectors/pom.xml}} file ends up like this all the time: {code:java} C02SD32LG8WP:flink-connectors Bowen$ git diff diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index bc3f82f..2b001f5 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -71,6 +71,16 @@ under the License. jsr305 provided + + com.amazonaws + amazon-kinesis-producer + 0.12.5 + + + com.amazonaws + amazon-kinesis-client + 1.8.1 + + + org.apache.flink + flink-streaming-java_2.11 + 1.4-SNAPSHOT + {code} FYI, building flink-connectors-kinesis from command line always works. [~tzulitai] Do you use Intellij? If so, how do you properly set up the flink-connectors-kinesis project in Intellij to be able to retrieve dependencies? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156546#comment-16156546 ] Tzu-Li (Gordon) Tai commented on FLINK-7367: Merged to {{master}} via 59eab45458b3b1637ccbc5dafd326cc84ffb9655. Thanks a lot for your work [~phoenixjiangnan]! > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-7367. -- Resolution: Fixed Fix Version/s: (was: 1.3.3) Since this includes deprecation, I will not merge it for {{1.3.2}}. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-7440. -- Resolution: Fixed Fix Version/s: (was: 1.3.3) Merged to master via 98737f9a875f1899cb14b3dcef1bd2ac1c6530ba. > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer
[ https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-7407. -- Resolution: Fixed > Assumption of partition id strict contiguity is too naive in Kafka consumer's > AbstractPartitionDiscoverer > - > > Key: FLINK-7407 > URL: https://issues.apache.org/jira/browse/FLINK-7407 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0 > > > In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition > discovery, already discovered partitions are tracked with the following map: > {code} > Map topicsToLargestDiscoveredPartitionId > {code} > Simply put, on each discovery attempt's metadata fetch, all partition ids of > a given topic that are smaller than the largest seen id will be ignored and > not assigned. This approach lies on the assumption that fetched partition ids > of a single topic are always strictly contiguous starting from 0. > This assumption may be too naive, in that partitions which were temporarily > unavailable at the time of a discovery would be shadowed by available > partitions with larger ids, and from then on would be left unassigned. > We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered > partitions by not relying on the contiguity assumption, and also add test > cases for non-contiguous fetched partition ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer
[ https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156543#comment-16156543 ] Tzu-Li (Gordon) Tai commented on FLINK-7407: Merged to {{master}} via 93369e79eb21f17791ddad4e03a18980be6eabfb. > Assumption of partition id strict contiguity is too naive in Kafka consumer's > AbstractPartitionDiscoverer > - > > Key: FLINK-7407 > URL: https://issues.apache.org/jira/browse/FLINK-7407 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0 > > > In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition > discovery, already discovered partitions are tracked with the following map: > {code} > Map topicsToLargestDiscoveredPartitionId > {code} > Simply put, on each discovery attempt's metadata fetch, all partition ids of > a given topic that are smaller than the largest seen id will be ignored and > not assigned. This approach lies on the assumption that fetched partition ids > of a single topic are always strictly contiguous starting from 0. > This assumption may be too naive, in that partitions which were temporarily > unavailable at the time of a discovery would be shadowed by available > partitions with larger ids, and from then on would be left unassigned. > We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered > partitions by not relying on the contiguity assumption, and also add test > cases for non-contiguous fetched partition ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4473 ---
[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156540#comment-16156540 ] ASF GitHub Bot commented on FLINK-7440: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4537 > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0, 1.3.3 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4537: [FLINK-7440] [kinesis] Add various eager serializa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4537 ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156541#comment-16156541 ] ASF GitHub Bot commented on FLINK-7367: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4473 > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7508: Fix Version/s: (was: 1.3.3) > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: Test KPL throughput per minute. Since the default RecordTTL for > KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL > within a minute, or we will see UserRecord expiration errors. > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model: it sends out 21million UserRecords within 30 sec without > any UserRecord expiration errors. The average peak CPU utilization is about > 20% - 30%. So 21million UserRecords/min is not the max throughput of > thread-pool model. We didn't go any further because 1) this throughput is > already a couple times more than what we really need, and 2) we don't have a > quick way of increasing the test load > Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. > [~tzulitai] What do you think -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7508: Description: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 Benchmarking I did: * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour. * Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL within a minute, or we will see UserRecord expiration errors. * One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed. * Thread-Pool model: it sends out 21million UserRecords within 30 sec without any UserRecord expiration errors. The average peak CPU utilization is about 20% - 30%. So 21million UserRecords/min is not the max throughput of thread-pool model. We didn't go any further because 1) this throughput is already a couple times more than what we really need, and 2) we don't have a quick way of increasing the test load Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. [~tzulitai] What do you think was: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 Benchmarking I did: * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour. * Criteria: * One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed. * Thread-Pool model: it sends out 21million UserRecords within one minute. The CPU utilization is about > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generate
[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7508: Description: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 Benchmarking I did: * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour. * Criteria: * One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed. * Thread-Pool model: it sends out 21million UserRecords within one minute. The CPU utilization is about was: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model: it sends out 21million UserRecords within one minute. > The CPU utilization is about -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer
[ https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156466#comment-16156466 ] ASF GitHub Bot commented on FLINK-7407: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4526 > Assumption of partition id strict contiguity is too naive in Kafka consumer's > AbstractPartitionDiscoverer > - > > Key: FLINK-7407 > URL: https://issues.apache.org/jira/browse/FLINK-7407 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0 > > > In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition > discovery, already discovered partitions are tracked with the following map: > {code} > Map topicsToLargestDiscoveredPartitionId > {code} > Simply put, on each discovery attempt's metadata fetch, all partition ids of > a given topic that are smaller than the largest seen id will be ignored and > not assigned. This approach lies on the assumption that fetched partition ids > of a single topic are always strictly contiguous starting from 0. > This assumption may be too naive, in that partitions which were temporarily > unavailable at the time of a discovery would be shadowed by available > partitions with larger ids, and from then on would be left unassigned. > We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered > partitions by not relying on the contiguity assumption, and also add test > cases for non-contiguous fetched partition ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156470#comment-16156470 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 No problem! And really sorry for the wait. I'm waiting for a Travis run before merging: https://travis-ci.org/tzulitai/flink/builds/272758087?utm_source=github_status&utm_medium=notification > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscov...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4526 ---
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 No problem! And really sorry for the wait. I'm waiting for a Travis run before merging: https://travis-ci.org/tzulitai/flink/builds/272758087?utm_source=github_status&utm_medium=notification ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156468#comment-16156468 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Thank you, Gordon! > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Thank you, Gordon! ---
[GitHub] flink issue #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4526 @aljoscha not yet, this is still mixed in one of my unmerged bathces. Merging this now .. ---
[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer
[ https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156465#comment-16156465 ] ASF GitHub Bot commented on FLINK-7407: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4526 @aljoscha not yet, this is still mixed in one of my unmerged bathces. Merging this now .. > Assumption of partition id strict contiguity is too naive in Kafka consumer's > AbstractPartitionDiscoverer > - > > Key: FLINK-7407 > URL: https://issues.apache.org/jira/browse/FLINK-7407 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0 > > > In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition > discovery, already discovered partitions are tracked with the following map: > {code} > Map topicsToLargestDiscoveredPartitionId > {code} > Simply put, on each discovery attempt's metadata fetch, all partition ids of > a given topic that are smaller than the largest seen id will be ignored and > not assigned. This approach lies on the assumption that fetched partition ids > of a single topic are always strictly contiguous starting from 0. > This assumption may be too naive, in that partitions which were temporarily > unavailable at the time of a discovery would be shadowed by available > partitions with larger ids, and from then on would be left unassigned. > We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered > partitions by not relying on the contiguity assumption, and also add test > cases for non-contiguous fetched partition ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156439#comment-16156439 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 Hi @bowenli86, sorry about this. I had the commit ready to merge but was waiting for another test PR to be merged first. Merging this now! > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4565: [FLINK-7429] [kinesis] Add migration test coverage...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4565 ---
[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156440#comment-16156440 ] ASF GitHub Bot commented on FLINK-7429: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4565 @aljoscha thanks! Closing .. > Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer > --- > > Key: FLINK-7429 > URL: https://issues.apache.org/jira/browse/FLINK-7429 > Project: Flink > Issue Type: Test > Components: Kinesis Connector, Tests >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from > Flink 1.1. > We should extend that to also verify restoring from 1.2 and 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4565: [FLINK-7429] [kinesis] Add migration test coverage for Fl...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4565 @aljoscha thanks! Closing .. ---
[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156441#comment-16156441 ] ASF GitHub Bot commented on FLINK-7429: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4565 > Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer > --- > > Key: FLINK-7429 > URL: https://issues.apache.org/jira/browse/FLINK-7429 > Project: Flink > Issue Type: Test > Components: Kinesis Connector, Tests >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from > Flink 1.1. > We should extend that to also verify restoring from 1.2 and 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 Hi @bowenli86, sorry about this. I had the commit ready to merge but was waiting for another test PR to be merged first. Merging this now! ---
[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7508: Description: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 was: KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new [ThreadingMode - Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 which upgrades KPL from 0.10 to 0.12.5. > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156424#comment-16156424 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai or other Flink committers, can you please merge this so I can submit more PRs depend on this? Thanks! > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai or other Flink committers, can you please merge this so I can submit more PRs depend on this? Thanks! ---
[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7508: Priority: Critical (was: Major) > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 which upgrades KPL from 0.10 to 0.12.5. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout
[ https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156416#comment-16156416 ] Bowen Li commented on FLINK-7590: - [~aljoscha] This also seems to be a bug in HadoopFS's S3 implementation. If we can't do anything about, I'd suggest removing this as a blocker of 1.4.0 > Flink failed to flush and close the file system output stream for > checkpointing because of s3 read timeout > -- > > Key: FLINK-7590 > URL: https://issues.apache.org/jira/browse/FLINK-7590 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Flink job failed once over the weekend because of the following issue. It > picked itself up afterwards and has been running well. But the issue might > worth taking a look at. > {code:java} > 2017-09-03 13:18:38,998 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce > (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 163 for operator reduce (14/18).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 163 for > operator reduce (14/18). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output stream > to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain > the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file > system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399) > at > org.apache.flink.run
[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)
[ https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156410#comment-16156410 ] Bowen Li commented on FLINK-7589: - After some research online, the root cause seems to be that AWSS3client connection got garbage collected when client is still reading data. Reference: https://stackoverflow.com/questions/9952815/s3-java-client-fails-a-lot-with-premature-end-of-content-length-delimited-messa I doubt if there's actually anything we can do about it. Flink is calling HadoopFS, and HadoopFS calls s3. As Flink developer, we can't touch the HadoopFS code. [~aljoscha] what do you think? > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536) > --- > > Key: FLINK-7589 > URL: https://issues.apache.org/jira/browse/FLINK-7589 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > When I tried to resume a Flink job from a savepoint with different > parallelism, I ran into this error. And the resume failed. > {code:java} > 2017-09-05 21:53:57,317 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> > Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to > FAILED. > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536 > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at > com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61) > at > org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) > at > org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7521) Remove the 10MB limit from the current REST implementation.
[ https://issues.apache.org/jira/browse/FLINK-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-7521: Assignee: Fang Yong > Remove the 10MB limit from the current REST implementation. > --- > > Key: FLINK-7521 > URL: https://issues.apache.org/jira/browse/FLINK-7521 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.4.0 > > > In the current {{AbstractRestServer}} we impose an upper bound of 10MB in the > states we can transfer. This is in the line {{.addLast(new > HttpObjectAggregator(1024 * 1024 * 10))}} of the server implementation. > This limit is restrictive for some of the usecases planned to use this > implementation (e.g. the job submission client which has to send full jars, > or the queryable state client which may have to receive states bigger than > that). > This issue proposes the elimination of this limit. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7594) Add a SQL CLI client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156356#comment-16156356 ] Kurt Young commented on FLINK-7594: --- +1 to this, it's a string improvement for usability (y) > Add a SQL CLI client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7594) Add a SQL CLI client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156356#comment-16156356 ] Kurt Young edited comment on FLINK-7594 at 9/7/17 2:46 AM: --- +1 to this, it's a strong improvement for usability (y) was (Author: ykt836): +1 to this, it's a string improvement for usability (y) > Add a SQL CLI client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7521) Remove the 10MB limit from the current REST implementation.
[ https://issues.apache.org/jira/browse/FLINK-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156352#comment-16156352 ] Fang Yong commented on FLINK-7521: -- It might be better to add a config option to set this value instead of remove it, what do you think? Thanks [~kkl0u] > Remove the 10MB limit from the current REST implementation. > --- > > Key: FLINK-7521 > URL: https://issues.apache.org/jira/browse/FLINK-7521 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > In the current {{AbstractRestServer}} we impose an upper bound of 10MB in the > states we can transfer. This is in the line {{.addLast(new > HttpObjectAggregator(1024 * 1024 * 10))}} of the server implementation. > This limit is restrictive for some of the usecases planned to use this > implementation (e.g. the job submission client which has to send full jars, > or the queryable state client which may have to receive states bigger than > that). > This issue proposes the elimination of this limit. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout
[ https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156347#comment-16156347 ] Bowen Li commented on FLINK-7590: - According to [1], this seems can be fixed by aws-sdk version. But I can't find where Flink brings in aws-sdk except flink-connectors-kinesis [1] https://github.com/aws/aws-sdk-java/issues/1174 > Flink failed to flush and close the file system output stream for > checkpointing because of s3 read timeout > -- > > Key: FLINK-7590 > URL: https://issues.apache.org/jira/browse/FLINK-7590 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Flink job failed once over the weekend because of the following issue. It > picked itself up afterwards and has been running well. But the issue might > worth taking a look at. > {code:java} > 2017-09-03 13:18:38,998 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce > (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 163 for operator reduce (14/18).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 163 for > operator reduce (14/18). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output stream > to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain > the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file > system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
[ https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7596: -- Description: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: ` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead was: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: ``` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ``` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead > Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) > -- > > Key: FLINK-7596 > URL: https://issues.apache.org/jira/browse/FLINK-7596 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > If two inputs with Any(GenericRelDataType), when they comes to Set > Operation(Union, minus...), it will cause a {{TableException}} with info is > "Type is not supported: ANY" > Here is the test case: > ` > @Test > def testUnion(): Unit = { > val list = List((1, new NODE), (2, new NODE)) > val list2 = List((3, new NODE), (4, new NODE)) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val s1 = tEnv.fromDataStream(env.fromCollection(list)) > val s2 = tEnv.fromDataStream(env.fromCollection(list2)) > val result = s1.unionAll(s2).toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > class NODE { > val x = new util.HashMap[String, String]() > } > ` > this bug happens because flink did't handle createSqlType(ANY) and Calcite > does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, > so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
[ https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7596: -- Description: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: ``` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ``` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead was: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: ` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead > Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) > -- > > Key: FLINK-7596 > URL: https://issues.apache.org/jira/browse/FLINK-7596 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > If two inputs with Any(GenericRelDataType), when they comes to Set > Operation(Union, minus...), it will cause a {{TableException}} with info is > "Type is not supported: ANY" > Here is the test case: > ``` > @Test > def testUnion(): Unit = { > val list = List((1, new NODE), (2, new NODE)) > val list2 = List((3, new NODE), (4, new NODE)) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val s1 = tEnv.fromDataStream(env.fromCollection(list)) > val s2 = tEnv.fromDataStream(env.fromCollection(list2)) > val result = s1.unionAll(s2).toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > class NODE { > val x = new util.HashMap[String, String]() > } > ``` > this bug happens because flink did't handle createSqlType(ANY) and Calcite > does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, > so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
[ https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7596: -- Description: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: ` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead was: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead > Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) > -- > > Key: FLINK-7596 > URL: https://issues.apache.org/jira/browse/FLINK-7596 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > If two inputs with Any(GenericRelDataType), when they comes to Set > Operation(Union, minus...), it will cause a {{TableException}} with info is > "Type is not supported: ANY" > Here is the test case: > ` > @Test > def testUnion(): Unit = { > val list = List((1, new NODE), (2, new NODE)) > val list2 = List((3, new NODE), (4, new NODE)) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val s1 = tEnv.fromDataStream(env.fromCollection(list)) > val s2 = tEnv.fromDataStream(env.fromCollection(list2)) > val result = s1.unionAll(s2).toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > class NODE { > val x = new util.HashMap[String, String]() > } > ` > this bug happens because flink did't handle createSqlType(ANY) and Calcite > does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, > so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
[ https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7596: -- Description: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead was: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: {{ @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } }} this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead > Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) > -- > > Key: FLINK-7596 > URL: https://issues.apache.org/jira/browse/FLINK-7596 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > If two inputs with Any(GenericRelDataType), when they comes to Set > Operation(Union, minus...), it will cause a {{TableException}} with info is > "Type is not supported: ANY" > Here is the test case: > @Test > def testUnion(): Unit = { > val list = List((1, new NODE), (2, new NODE)) > val list2 = List((3, new NODE), (4, new NODE)) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val s1 = tEnv.fromDataStream(env.fromCollection(list)) > val s2 = tEnv.fromDataStream(env.fromCollection(list2)) > val result = s1.unionAll(s2).toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > class NODE { > val x = new util.HashMap[String, String]() > } > this bug happens because flink did't handle createSqlType(ANY) and Calcite > does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, > so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
[ https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-7596: -- Description: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a {{TableException}} with info is "Type is not supported: ANY" Here is the test case: {{ @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } }} this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead was: If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a `TableException` with info is "Type is not supported: ANY" Here is the test case: ` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the `createSqlType(ANY)` of Calcite will return a BasicSqlType instead > Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) > -- > > Key: FLINK-7596 > URL: https://issues.apache.org/jira/browse/FLINK-7596 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > If two inputs with Any(GenericRelDataType), when they comes to Set > Operation(Union, minus...), it will cause a {{TableException}} with info is > "Type is not supported: ANY" > Here is the test case: > {{ > @Test > def testUnion(): Unit = { > val list = List((1, new NODE), (2, new NODE)) > val list2 = List((3, new NODE), (4, new NODE)) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val s1 = tEnv.fromDataStream(env.fromCollection(list)) > val s2 = tEnv.fromDataStream(env.fromCollection(list2)) > val result = s1.unionAll(s2).toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > class NODE { > val x = new util.HashMap[String, String]() > } > }} > this bug happens because flink did't handle createSqlType(ANY) and Calcite > does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, > so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
Ruidong Li created FLINK-7596: - Summary: Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) Key: FLINK-7596 URL: https://issues.apache.org/jira/browse/FLINK-7596 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Ruidong Li Assignee: Ruidong Li If two inputs with Any(GenericRelDataType), when they comes to Set Operation(Union, minus...), it will cause a `TableException` with info is "Type is not supported: ANY" Here is the test case: ` @Test def testUnion(): Unit = { val list = List((1, new NODE), (2, new NODE)) val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val s1 = tEnv.fromDataStream(env.fromCollection(list)) val s2 = tEnv.fromDataStream(env.fromCollection(list2)) val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } class NODE { val x = new util.HashMap[String, String]() } ` this bug happens because flink did't handle createSqlType(ANY) and Calcite does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the `createSqlType(ANY)` of Calcite will return a BasicSqlType instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156251#comment-16156251 ] ASF GitHub Bot commented on FLINK-7465: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4652 [FLINK-7465][table]Add cardinality count for tableAPI and SQL. ## What is the purpose of the change *In this PR. we want add add CARDINALITY_COUNT for tableAPI and SQL.(Using `HyperLogLog` algorithm). The implementation of HyperLogLog (HLL) algorithm from this paper: http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf As we know there are still some improved algorithms, such as: HyperLogLog++, HyperBitBit etc. But `HyperLogLog` is a classic algorithm that has been massively verified, so I chose to use the `HyperLogLog` algorithm as the first version of cardinality to achieve. And we can improve the algorithm at any time If we need. * ## Brief change log - *Add Java implementation of `HyperLogLog`(base on stream-lib)* - *Add MURMURHASH See more: http://murmurhash.googlepages.com/* - *Add build-in `CardinalityCountAggFunction`* - *Add some test case for the validation* - *Add documentation for TableAPI&SQL* ## Verifying this change This change added tests and can be verified as follows: - *Added SQL/TableAPI integration tests for `cardinality_count`* - *Added `CardinalityCountAggFunctionTest` test case for verify the AGG logic.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-7465-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4652 commit bc1166ad88538bdcdd6df685c750359aadff3950 Author: 金竹 Date: 2017-09-05T10:21:10Z [FLINK-7465][table]Add cardinality count for tableAPI and SQL. > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~tw
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4652 [FLINK-7465][table]Add cardinality count for tableAPI and SQL. ## What is the purpose of the change *In this PR. we want add add CARDINALITY_COUNT for tableAPI and SQL.(Using `HyperLogLog` algorithm). The implementation of HyperLogLog (HLL) algorithm from this paper: http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf As we know there are still some improved algorithms, such as: HyperLogLog++, HyperBitBit etc. But `HyperLogLog` is a classic algorithm that has been massively verified, so I chose to use the `HyperLogLog` algorithm as the first version of cardinality to achieve. And we can improve the algorithm at any time If we need. * ## Brief change log - *Add Java implementation of `HyperLogLog`(base on stream-lib)* - *Add MURMURHASH See more: http://murmurhash.googlepages.com/* - *Add build-in `CardinalityCountAggFunction`* - *Add some test case for the validation* - *Add documentation for TableAPI&SQL* ## Verifying this change This change added tests and can be verified as follows: - *Added SQL/TableAPI integration tests for `cardinality_count`* - *Added `CardinalityCountAggFunctionTest` test case for verify the AGG logic.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-7465-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4652 commit bc1166ad88538bdcdd6df685c750359aadff3950 Author: 金竹 Date: 2017-09-05T10:21:10Z [FLINK-7465][table]Add cardinality count for tableAPI and SQL. ---
[jira] [Commented] (FLINK-7594) Add a SQL CLI client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156221#comment-16156221 ] Haohui Mai commented on FLINK-7594: --- We internally has a project (AthenaX) for this requirement and we are in the process of open sourcing it. We are happy to contribute it directly to the flink repository as well. > Add a SQL CLI client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager
[ https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156061#comment-16156061 ] ASF GitHub Bot commented on FLINK-7124: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4510#discussion_r137395149 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.testingUtils.TestingUtils; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * This class contains tests that verify when rescaling a {@link JobGraph}, + * constructed {@link ExecutionGraph}s are correct. + */ +public class ExecutionGraphRescalingTest { + + private static final Logger TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphRescalingTest.class); + + @Test + public void testExecutionGraphArbitraryDopConstructionTest() throws Exception { + + final Configuration config = new Configuration(); + + final JobVertex[] jobVertices = createVerticesForSimpleBipartiteJobGraph(); + final JobGraph jobGraph = new JobGraph(jobVertices); + + // TODO rescaling the JobGraph is currently only supported if the + // TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX. + // TODO this limitation should be removed. + for (JobVertex jv : jobVertices) { + jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + } + + ExecutionGraph eg = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + config, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + new Scheduler(TestingUtils.defaultExecutionContext()), + Thread.currentThread().getContextClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + 5, + TEST_LOGGER); + + for (JobVertex jv : jobVertices) { + assertEquals(5, jv.getParallelism()); + } + verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, jobVertices); + + // --- verify scaling up works correctly --- + + // TODO rescaling the JobGraph is currently only supported if the + // TODO configured parallelism is ExecutionCon
[GitHub] flink pull request #4510: [FLINK-7124] [flip-6] Add test to verify rescaling...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4510#discussion_r137395149 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.testingUtils.TestingUtils; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * This class contains tests that verify when rescaling a {@link JobGraph}, + * constructed {@link ExecutionGraph}s are correct. + */ +public class ExecutionGraphRescalingTest { + + private static final Logger TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphRescalingTest.class); + + @Test + public void testExecutionGraphArbitraryDopConstructionTest() throws Exception { + + final Configuration config = new Configuration(); + + final JobVertex[] jobVertices = createVerticesForSimpleBipartiteJobGraph(); + final JobGraph jobGraph = new JobGraph(jobVertices); + + // TODO rescaling the JobGraph is currently only supported if the + // TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX. + // TODO this limitation should be removed. + for (JobVertex jv : jobVertices) { + jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + } + + ExecutionGraph eg = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + config, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + new Scheduler(TestingUtils.defaultExecutionContext()), + Thread.currentThread().getContextClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + 5, + TEST_LOGGER); + + for (JobVertex jv : jobVertices) { + assertEquals(5, jv.getParallelism()); + } + verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, jobVertices); + + // --- verify scaling up works correctly --- + + // TODO rescaling the JobGraph is currently only supported if the + // TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX. + // TODO this limitation should be removed. + for (JobVertex jv : jobVertices) { + jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + } + +
[jira] [Commented] (FLINK-7430) ContinuousFileReaderOperator swallows exceptions
[ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155874#comment-16155874 ] Peter Ertl commented on FLINK-7430: --- Thanks for your great help guys! :) > ContinuousFileReaderOperator swallows exceptions > > > Key: FLINK-7430 > URL: https://issues.apache.org/jira/browse/FLINK-7430 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.4.0, 1.3.2 > Environment: - macOS 10.12.6 > - Oracle JDK 1.8.0_144 > - Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > The class *ContinuousFileReaderOperator* is swallowing exceptions as the > following example demonstrates: > {code:java} > package org.apache.flink.streaming.examples; > import java.io.File; > import java.io.IOException; > import org.apache.flink.api.common.io.OutputFormat; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class FormatExceptionSwallowed { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > File bla = File.createTempFile("foo", "baz"); > try(PrintWriter w = new PrintWriter(bla)) { > w.println("one"); > w.println("two"); > w.println("three"); > } > env.readTextFile(bla.getCanonicalPath()) > .writeUsingOutputFormat(new OutputFormat() { > @Override > public void configure(final Configuration > parameters) { > } > @Override > public void open(final int taskNumber, final > int numTasks) throws IOException { > } > @Override > public void writeRecord(final String record) > throws IOException { > throw new > IllegalArgumentException("bla"); > } > @Override > public void close() throws IOException { > } > }); > env.execute("go"); > > // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155873#comment-16155873 ] Peter Ertl commented on FLINK-7567: --- if the parallelism of the feedback stream MUST be equal to the parallism of the input stream why not make it the default? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer
[ https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155858#comment-16155858 ] ASF GitHub Bot commented on FLINK-7407: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4526 @tzulitai Did you end up merging this? > Assumption of partition id strict contiguity is too naive in Kafka consumer's > AbstractPartitionDiscoverer > - > > Key: FLINK-7407 > URL: https://issues.apache.org/jira/browse/FLINK-7407 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0 > > > In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition > discovery, already discovered partitions are tracked with the following map: > {code} > Map topicsToLargestDiscoveredPartitionId > {code} > Simply put, on each discovery attempt's metadata fetch, all partition ids of > a given topic that are smaller than the largest seen id will be ignored and > not assigned. This approach lies on the assumption that fetched partition ids > of a single topic are always strictly contiguous starting from 0. > This assumption may be too naive, in that partitions which were temporarily > unavailable at the time of a discovery would be shadowed by available > partitions with larger ids, and from then on would be left unassigned. > We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered > partitions by not relying on the contiguity assumption, and also add test > cases for non-contiguous fetched partition ids. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4526 @tzulitai Did you end up merging this? ---
[jira] [Commented] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
[ https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155848#comment-16155848 ] ASF GitHub Bot commented on FLINK-7357: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/4521 @twalthr looks like this should resolve the last filter issue. Please kindly take a look and see if this looks good to go in :-) > HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY > HOP window > - > > Key: FLINK-7357 > URL: https://issues.apache.org/jira/browse/FLINK-7357 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Rong Rong > > The following SQL does not compile: > {code:title=invalid_having_hop_start_sql} > SELECT > c AS k, > COUNT(a) AS v, > HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS > windowStart, > HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd > FROM > T1 > GROUP BY > HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), > c > HAVING > SUM(b) > 1 > {code} > While individually keeping HAVING clause or HOP_START field compiles and runs > without issue. > more details: > https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/4521 @twalthr looks like this should resolve the last filter issue. Please kindly take a look and see if this looks good to go in :-) ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137292784 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * A service to retrieve transient binary large objects (BLOBs). + * + * These include per-job BLOBs that are , e.g. a job's JAR files, parts of an off-loaded {@link --- End diff -- I thought that job's JAR files are stored in the permanent blob service? ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { --- End diff -- Probably not so easy because we also need it for the `BlobClient` creation. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1619#comment-1619 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259367 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); --- End diff -- I think "ZooKeeper" is not a valid state backend. What did you want to do with that? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137287126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** + * A service to retrieve permanent binary large objects (BLOBs). + * + * These include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's JAR + * files, parts of an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor} + * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}. + */ +public interface PermanentBlobService extends Closeable { + + /** +* Returns the path to a local copy of the file associated with the provided job ID and blob +* key. +* +* @param jobId +* ID of the job this blob belongs to +* @param key +* BLOB key associated with the requested file +* +* @return The path to the file. +* +* @throws java.io.FileNotFoundException +* if the BLOB does not exist; +* @throws IOException +* if any other error occurs when retrieving the file +*/ + File getHAFile(JobID jobId, BlobKey key) throws IOException; --- End diff -- Not sure whether this is the right name because HA does not depend on the `PermanentBlobService` but on the the `BlobStore`. I would suggest to rename it. ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137270473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { --- End diff -- Maybe we could introduce an enum here as well for the `highlyAvailable` boolean argument. ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { + + this.serverAddress = checkNotNull(serverAddress); + this.blobClientConfig = checkNotNull(blobClientConfig); + this.readWriteLock = new ReentrantReadWriteLock(); + + // configure and create the storage directory + String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); + this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory); + LOG.info("Created transient BLOB cache storage directory " + storageDir); + + // configure the number of fetch retries + final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES); + if (fetchRetries >= 0) { + this.numFetchRetries = fetchRetries; + } else { +
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155564#comment-16155564 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { + + this.serverAddress = checkNotNull(serverAddress); + this.blobClientConfig = checkNotNull(blobClientConfig); + this.readWriteLock = new ReentrantReadWriteLock(); + + // configure and create the storage directory + String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); + this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory); + LOG.info("Created transient BLOB cache storage directory " + storageDir); + + // configure the number of f
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1618#comment-1618 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137268133 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOExceptio } /** -* Uploads the JAR files to a {@link BlobServer} at the given address. +* Reads the response from the input stream and throws in case of errors +* +* @param is +* stream to read from +* +* @return true if the delete operation was successful at the {@link BlobServer}; +* false otherwise +* +* @throws IOException +* if the server code throws an exception or if reading the response failed +*/ + private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException { + int response = is.read(); + if (response < 0) { + throw new EOFException("Premature end of response"); + } + if (response == RETURN_ERROR) { + Throwable cause = readExceptionFromStream(is); + if (cause == null) { + return false; + } else { + throw new IOException("Server side error: " + cause.getMessage(), cause); --- End diff -- I think we don't have to append the cause message to `IOException's` message, because it is included in the `cause`. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155560#comment-16155560 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137287126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** + * A service to retrieve permanent binary large objects (BLOBs). + * + * These include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's JAR + * files, parts of an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor} + * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}. + */ +public interface PermanentBlobService extends Closeable { + + /** +* Returns the path to a local copy of the file associated with the provided job ID and blob +* key. +* +* @param jobId +* ID of the job this blob belongs to +* @param key +* BLOB key associated with the requested file +* +* @return The path to the file. +* +* @throws java.io.FileNotFoundException +* if the BLOB does not exist; +* @throws IOException +* if any other error occurs when retrieving the file +*/ + File getHAFile(JobID jobId, BlobKey key) throws IOException; --- End diff -- Not sure whether this is the right name because HA does not depend on the `PermanentBlobService` but on the the `BlobStore`. I would suggest to rename it. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137295883 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { --- End diff -- Just a wild thought: I noticed that the `TransientBlobCache` and the `PermanentBlobCache` have a lot of code in common. In order to reduce code duplication couldn't we create a common base class or let `PermanentBlobCache` extend `TransientBlobCache` adding the ref counting? ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155562#comment-16155562 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137270473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { --- End diff -- Maybe we could introduce an enum here as well for the `highlyAvailable` boolean argument. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137302842 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException { // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); - start(); --- End diff -- Why did you pull `start` out of the constructor? Wouldn't one always want to start the `BlobServer` when creating it? ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155570#comment-16155570 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137289123 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** Root directory for local file storage */ + private final File storageDir; + + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobView blobView; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + // + + /** +* Job reference counters with a time-to-live (TTL). +*/ + @VisibleForTesting + static class RefCount { + /** +* Number of references to a job. +*/ + public int references = 0; + + /** +* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for +* non-positive values). +*/ + public lon
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155571#comment-16155571 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { --- End diff -- Can we change it such that we don't pass in a `Configuration` object but instead the required values? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStor
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155574#comment-16155574 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259815 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any +* participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}. +*/ + @Test + public void testBlobCacheRecovery() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); --- End diff -- Statebackend not defined. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155575#comment-16155575 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137295883 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { --- End diff -- Just a wild thought: I noticed that the `TransientBlobCache` and the `PermanentBlobCache` have a lot of code in common. In order to reduce code duplication couldn't we create a common base class or let `PermanentBlobCache` extend `TransientBlobCache` adding the ref counting? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137271569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** +* Helper to retrieve the local path of a file associated with a job and a blob key. +* +* The blob server looks the blob key up in its local storage. If the file exists, it is +* returned. If the file does not exist, it is retrieved from the HA blob store (if available) +* or a {@link FileNotFoundException} is thrown. +* +* Assumes the read lock has already been acquired. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param blobKey +* blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) +* @param localFile +* (local) file where the blob is/should be stored +* +* @throws IOException +* Thrown if the file retrieval failed. +*/ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Not sure whether the `writeLock` should escape the scope of the BlobServer via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside of the `moveTempFileToStore` method. This should also give a better separation of concerns. ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137268133 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOExceptio } /** -* Uploads the JAR files to a {@link BlobServer} at the given address. +* Reads the response from the input stream and throws in case of errors +* +* @param is +* stream to read from +* +* @return true if the delete operation was successful at the {@link BlobServer}; +* false otherwise +* +* @throws IOException +* if the server code throws an exception or if reading the response failed +*/ + private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException { + int response = is.read(); + if (response < 0) { + throw new EOFException("Premature end of response"); + } + if (response == RETURN_ERROR) { + Throwable cause = readExceptionFromStream(is); + if (cause == null) { + return false; + } else { + throw new IOException("Server side error: " + cause.getMessage(), cause); --- End diff -- I think we don't have to append the cause message to `IOException's` message, because it is included in the `cause`. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1617#comment-1617 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266511 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); + } else if (permanentBlob) { + outputStream.write(CONTENT_FOR_JOB_HA); --- End diff -- Does it make sense to rename this constant to `PERMANENT_JOB_CONTENT`? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155561#comment-16155561 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137293376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -215,10 +215,10 @@ public static ClassLoader retrieveClassLoader( JobManagerMessages.ClassloadingProps props = optProps.get(); InetSocketAddress serverAddress = new InetSocketAddress(jobManager.getHostname(), props.blobManagerPort()); - final BlobCache blobClient; + final PermanentBlobCache blobClient; try { - // TODO: Fix lifecycle of BlobCache to properly close it upon usage - blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); + // TODO: Fix lifecycle of PermanentBlobCache to properly close it upon usage + blobClient = new PermanentBlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); --- End diff -- shouldn't the variable be called `permanentBlobCache` instead of `blobClient`? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155577#comment-16155577 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137268826 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int l * the ID of the job the BLOB belongs to (or null if job-unrelated) * @param inputStream * the input stream to read the data from +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @return the computed BLOB key of the uploaded BLOB * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException { + BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob) --- End diff -- Should we introduce an `enum` instead of a boolean denoting whether the content is transient or permanent? This would have the advantage that it's much clearer what's happening when looking at code at the calling side. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155572#comment-16155572 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137303136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java --- @@ -18,61 +18,31 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.api.common.JobID; - import java.io.Closeable; -import java.io.File; -import java.io.IOException; /** * A simple store and retrieve binary large objects (BLOBs). */ public interface BlobService extends Closeable { /** -* Returns the path to a local copy of the (job-unrelated) file associated with the provided -* blob key. -* -* @param key blob key associated with the requested file -* @return The path to the file. -* @throws java.io.FileNotFoundException when the path does not exist; -* @throws IOException if any other error occurs when retrieving the file -*/ - File getFile(BlobKey key) throws IOException; - - /** -* Returns the path to a local copy of the file associated with the provided job ID and blob key. +* Returns a BLOB service for accessing permanent BLOBs. * -* @param jobId ID of the job this blob belongs to -* @param key blob key associated with the requested file -* @return The path to the file. -* @throws java.io.FileNotFoundException when the path does not exist; -* @throws IOException if any other error occurs when retrieving the file +* @return BLOB service */ - File getFile(JobID jobId, BlobKey key) throws IOException; + PermanentBlobService getPermanentBlobStore(); --- End diff -- I'm not so sure about the naming here. The `BlobStore` is actually something else than the `BlobService`. Would be good if we use a consistent naming for the different things (this could also include renaming some of the entities). > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155565#comment-16155565 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266916 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); --- End diff -- Should we rename this variable to `TRANSIENT_CONTENT`? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137262572 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java --- @@ -158,7 +151,7 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou cache = blobPortFuture.thenApplyAsync( (Integer port) -> { try { - return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); + return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config); } catch (IOException e) { throw new FlinkFutureException("Could not create BlobCache.", e); --- End diff -- Maybe we could adapt the exception message to `TransientBlobCache`. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155567#comment-16155567 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137271569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** +* Helper to retrieve the local path of a file associated with a job and a blob key. +* +* The blob server looks the blob key up in its local storage. If the file exists, it is +* returned. If the file does not exist, it is retrieved from the HA blob store (if available) +* or a {@link FileNotFoundException} is thrown. +* +* Assumes the read lock has already been acquired. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param blobKey +* blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) +* @param localFile +* (local) file where the blob is/should be stored +* +* @throws IOException +* Thrown if the file retrieval failed. +*/ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Not sure whether the `writeLock` should escape the scope of the BlobServer via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside of the `moveTempFileToStore` method. This should also give a better separation of concerns. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Re
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155566#comment-16155566 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137262572 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java --- @@ -158,7 +151,7 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou cache = blobPortFuture.thenApplyAsync( (Integer port) -> { try { - return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); + return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config); } catch (IOException e) { throw new FlinkFutureException("Could not create BlobCache.", e); --- End diff -- Maybe we could adapt the exception message to `TransientBlobCache`. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155576#comment-16155576 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137302842 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException { // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); - start(); --- End diff -- Why did you pull `start` out of the constructor? Wouldn't one always want to start the `BlobServer` when creating it? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137289123 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** Root directory for local file storage */ + private final File storageDir; + + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobView blobView; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + // + + /** +* Job reference counters with a time-to-live (TTL). +*/ + @VisibleForTesting + static class RefCount { + /** +* Number of references to a job. +*/ + public int references = 0; + + /** +* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for +* non-positive values). +*/ + public long keepUntil = -1; + } + + /** Map to store the number of references to a specific job */ + private final Map jobRefCounters = new HashMap<>(); + + /** Time interval (ms) to run the cleanup task; also used as the default TTL
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259798 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); --- End diff -- Why not creating the `blobStoreService` outside of the try-finally block. Then you don't have to make the null check in the finally block. ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); + } else if (permanentBlob) { + outputStream.write(CONTENT_FOR_JOB_HA); + outputStream.write(jobId.getBytes()); } else { outputStream.write(CONTENT_FOR_JOB); --- End diff -- Same here to `TRANSIENT_JOB_CONTENT`? ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259815 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any +* participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}. +*/ + @Test + public void testBlobCacheRecovery() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); --- End diff -- Statebackend not defined. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155573#comment-16155573 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); + } else if (permanentBlob) { + outputStream.write(CONTENT_FOR_JOB_HA); + outputStream.write(jobId.getBytes()); } else { outputStream.write(CONTENT_FOR_JOB); --- End diff -- Same here to `TRANSIENT_JOB_CONTENT`? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155568#comment-16155568 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137292784 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * A service to retrieve transient binary large objects (BLOBs). + * + * These include per-job BLOBs that are , e.g. a job's JAR files, parts of an off-loaded {@link --- End diff -- I thought that job's JAR files are stored in the permanent blob service? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137293376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -215,10 +215,10 @@ public static ClassLoader retrieveClassLoader( JobManagerMessages.ClassloadingProps props = optProps.get(); InetSocketAddress serverAddress = new InetSocketAddress(jobManager.getHostname(), props.blobManagerPort()); - final BlobCache blobClient; + final PermanentBlobCache blobClient; try { - // TODO: Fix lifecycle of BlobCache to properly close it upon usage - blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); + // TODO: Fix lifecycle of PermanentBlobCache to properly close it upon usage + blobClient = new PermanentBlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); --- End diff -- shouldn't the variable be called `permanentBlobCache` instead of `blobClient`? ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137303136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java --- @@ -18,61 +18,31 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.api.common.JobID; - import java.io.Closeable; -import java.io.File; -import java.io.IOException; /** * A simple store and retrieve binary large objects (BLOBs). */ public interface BlobService extends Closeable { /** -* Returns the path to a local copy of the (job-unrelated) file associated with the provided -* blob key. -* -* @param key blob key associated with the requested file -* @return The path to the file. -* @throws java.io.FileNotFoundException when the path does not exist; -* @throws IOException if any other error occurs when retrieving the file -*/ - File getFile(BlobKey key) throws IOException; - - /** -* Returns the path to a local copy of the file associated with the provided job ID and blob key. +* Returns a BLOB service for accessing permanent BLOBs. * -* @param jobId ID of the job this blob belongs to -* @param key blob key associated with the requested file -* @return The path to the file. -* @throws java.io.FileNotFoundException when the path does not exist; -* @throws IOException if any other error occurs when retrieving the file +* @return BLOB service */ - File getFile(JobID jobId, BlobKey key) throws IOException; + PermanentBlobService getPermanentBlobStore(); --- End diff -- I'm not so sure about the naming here. The `BlobStore` is actually something else than the `BlobService`. Would be good if we use a consistent naming for the different things (this could also include renaming some of the entities). ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266916 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); --- End diff -- Should we rename this variable to `TRANSIENT_CONTENT`? ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137268826 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int l * the ID of the job the BLOB belongs to (or null if job-unrelated) * @param inputStream * the input stream to read the data from +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @return the computed BLOB key of the uploaded BLOB * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException { + BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob) --- End diff -- Should we introduce an `enum` instead of a boolean denoting whether the content is transient or permanent? This would have the advantage that it's much clearer what's happening when looking at code at the calling side. ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { --- End diff -- Can we change it such that we don't pass in a `Configuration` object but instead the required values? ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155569#comment-16155569 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137291887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides access to transient BLOB files stored at the {@link BlobServer}. + * + * TODO: currently, this is still cache-based with local copies - make this truly transient, i.e. return file streams with no local copy + */ +public class TransientBlobCache implements TransientBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(TransientBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage +*/ + private final File storageDir; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the local storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + /** +* Instantiates a new BLOB cache. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public TransientBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig) throws IOException { --- End diff -- Probably not so easy because we also need it for the `BlobClient` creation. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cas
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155563#comment-16155563 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259798 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); --- End diff -- Why not creating the `blobStoreService` outside of the try-finally block. Then you don't have to make the null check in the finally block. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137266511 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOExcepti * ID of the job this blob belongs to (or null if job-unrelated) * @param blobKey * blob key associated with the requested file +* @param permanentBlob +* whether the BLOB is permanent (true) or transient (false) * * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException { + private static void sendGetHeader( + OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob) + throws IOException { checkNotNull(blobKey); + checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related"); // Signal type of operation outputStream.write(GET_OPERATION); // Send job ID and key if (jobId == null) { outputStream.write(CONTENT_NO_JOB); + } else if (permanentBlob) { + outputStream.write(CONTENT_FOR_JOB_HA); --- End diff -- Does it make sense to rename this constant to `PERMANENT_JOB_CONTENT`? ---
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r137259367 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception { try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); - BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } + } + + /** +* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are +* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. +*/ + @Test + public void testBlobServerCorruptedFile() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); --- End diff -- I think "ZooKeeper" is not a valid state backend. What did you want to do with that? ---
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155541#comment-16155541 ] ASF GitHub Bot commented on FLINK-6988: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 What were the bugs that you fixed? > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 What were the bugs that you fixed? ---
[GitHub] flink pull request #4575: [FLINK-7494][travis] Add license headers to '.trav...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4575#discussion_r137297560 --- Diff: .travis.yml --- @@ -1,4 +1,19 @@ -# s3 deployment based on http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/ --- End diff -- done. ---