[jira] [Created] (FLINK-24041) [FLIP-171] Generic AsyncSinkBase
Zichen Liu created FLINK-24041: -- Summary: [FLIP-171] Generic AsyncSinkBase Key: FLINK-24041 URL: https://issues.apache.org/jira/browse/FLINK-24041 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink
Zichen Liu created FLINK-24227: -- Summary: [FLIP-171] KDS implementation of Async Sink Key: FLINK-24227 URL: https://issues.apache.org/jira/browse/FLINK-24227 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24227: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: There will be additional work later to move these implementations somewhere else (see the[ ongoing discussion)|http://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3ccagznd0ebmgud327_j4gvdyaoygaewxmjz9kzn33fv0v+j8g...@mail.gmail.com%3e]. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] KDS implementation of Async Sink > --- > > Key: FLINK-24227 > URL: https://issues.apache.org/jira/browse/FLINK-24227 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use Kinesis Data Streams as sink for my data > pipeline. > *Scope:* > * Implement an asynchronous sink for Kinesis Data Streams (KDS) by > inheriting the AsyncSinkBase class. The implementation can for now reside in > its own module in flink-connectors. The module and package name can be > anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name > and {{org.apache.flink.connector.aws.kinesis}} for the package name. > * The implementation must use [the Kinesis Java > Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. > * The implementation must allow users to configure the Kinesis Client, with > reasonable default setting
[jira] [Updated] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24227: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: There will be additional work later to move these implementations somewhere else (see the[ ongoing discussion)|http://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3ccagznd0ebmgud327_j4gvdyaoygaewxmjz9kzn33fv0v+j8g...@mail.gmail.com%3e]. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > [FLIP-171] KDS implementation of Async Sink > --- > > Key: FLINK-24227 > URL: https://issues.apache.org/jira/browse/FLINK-24227 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use Kinesis Data Streams as sink for my data > pipeline. > *Scope:* > * Implement an asynchronous sink for Kinesis Data Streams (KDS) by > inheriting the AsyncSinkBase class. The implementation can for now reside in > its own module in flink-connectors. The module and package name can be > anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name > and {{org.apache.flink.connector.aws.kinesis}} for the package name. > Side-note: There will be additional work later to move these implementations > somewhere else (see the[
[jira] [Created] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink
Zichen Liu created FLINK-24228: -- Summary: [FLIP-171] Firehose implementation of Async Sink Key: FLINK-24228 URL: https://issues.apache.org/jira/browse/FLINK-24228 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24228: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Firehose by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for Firehose by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] Firehose implementation of Async Sink > > > Key: FLINK-24228 > URL: https://issues.apache.org/jira/browse/FLINK-24228 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use Kinesis Firehose as sink for my data > pipeline. > *Scope:* > * Implement an asynchronous sink for Kinesis Firehose by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. The module and package name can be anything reasonable > e.g. {{flink-connector-aws-kinesis}} for the module name and > {{org.apache.flink.connector.aws.kinesis}} for the package name. > * The implementation must use [the Kinesis Java > Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. > * The implementation must allow users to configure the Kinesis Client, with > reasonable default settings. > * Implement an asynchornous sink writer for Firehose by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requ
[jira] [Created] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
Zichen Liu created FLINK-24229: -- Summary: [FLIP-171] DynamoDB implementation of Async Sink Key: FLINK-24229 URL: https://issues.apache.org/jira/browse/FLINK-24229 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Firehose by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for Firehose by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24229: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. * Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Firehose by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for Firehose by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
Zichen Liu created FLINK-24234: -- Summary: [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase Key: FLINK-24234 URL: https://issues.apache.org/jira/browse/FLINK-24234 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24234: --- Description: *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * {{*Scope* }} * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase > - > > Key: FLINK-24234 > URL: https://issues.apache.org/jira/browse/FLINK-24234 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > *User stories* > * As a Sink user, I’d like to configure the batch size for items to send to > the destination at once (e.g. “flush if there are x number of items in the > batch”) > * As a Sink user, I’d like to configure the batching logic so that I can > flush the batch of requests based on time period (e.g. “flush every 2 > seconds”) > * As a Sink user I’d like to specify the number of bytes for the batch of > requests to be flushed (e.g. ”submit the batch after the total number of > bytes in it is above 1KB”) > * As a Sink developer, I’d like to use the configuration mechanism provided > to allow Sink users to configure my Sink implementation > * > {{*Scope* > }} > * Allow Sink developers and users to pass batch size config to the > AsyncSinkWriter > * Add support for time-based flushing (e.g. “flush after x miliseconds”) > using the ProcessingTimeService which is part of the Sink interface > * Add support for byte-based flushing > * Consider the combination of time-based flushing and byte-based flushing, > if there are more bytes than configured in the time-based batch, then the > last few (however m
[jira] [Updated] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24234: --- Description: *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * *Scope* * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * {{*Scope* }} * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase > - > > Key: FLINK-24234 > URL: https://issues.apache.org/jira/browse/FLINK-24234 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > *User stories* > * As a Sink user, I’d like to configure the batch size for items to send to > the destination at once (e.g. “flush if there are x number of items in the > batch”) > * As a Sink user, I’d like to configure the batching logic so that I can > flush the batch of requests based on time period (e.g. “flush every 2 > seconds”) > * As a Sink user I’d like to specify the number of bytes for the batch of > requests to be flushed (e.g. ”submit the batch after the total number of > bytes in it is above 1KB”) > * As a Sink developer, I’d like to use the configuration mechanism provided > to allow Sink users to configure my Sink implementation > * > *Scope* > * Allow Sink developers and users to pass batch size config to the > AsyncSinkWriter > * Add support for time-based flushing (e.g. “flush after x miliseconds”) > using the ProcessingTimeService which is part of the Sink interface > * Add support for byte-based flushing > * Consider the combination of time-based flushing and byte-based flushing, > if there are more bytes than configured in the time-based batch, then the > last few (however many necessary) items should go in the next batch to > satisfy the requirement for the number of bytes. > *References* > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation
[ https://issues.apache.org/jira/browse/FLINK-24278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24278: --- Description: As an Async Sink developer, I’d like to have a step by step guide to implementing new Async Sinks *Scope:* * A mark down file in the async sink package guiding developers through steps to create new async sink implementations. We could generate PDFs and HTML pages from this file later, to share it in other places if needed. was: *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * *Scope* * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] Async Sink Base Sink Developer Guide for Documentation > - > > Key: FLINK-24278 > URL: https://issues.apache.org/jira/browse/FLINK-24278 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > As an Async Sink developer, I’d like to have a step by step guide to > implementing new Async Sinks > *Scope:* > * A mark down file in the async sink package guiding developers through > steps to create new async sink implementations. We could generate PDFs and > HTML pages from this file later, to share it in other places if needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation
Zichen Liu created FLINK-24278: -- Summary: [FLIP-171] Async Sink Base Sink Developer Guide for Documentation Key: FLINK-24278 URL: https://issues.apache.org/jira/browse/FLINK-24278 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * *Scope* * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
Zichen Liu created FLINK-24370: -- Summary: [FLIP-171] Documentation for Generic AsyncSinkBase Key: FLINK-24370 URL: https://issues.apache.org/jira/browse/FLINK-24370 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24370: --- Description: h2. Motivation To write documentation for FLIP-171 Async Sink Base. This will help sink implementers get acquainted with the necessary information to write their concrete sinks. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > [FLIP-171] Documentation for Generic AsyncSinkBase > -- > > Key: FLINK-24370 > URL: https://issues.apache.org/jira/browse/FLINK-24370 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > To write documentation for FLIP-171 Async Sink Base. This will help sink > implementers get acquainted with the necessary information to write their > concrete sinks. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17438824#comment-17438824 ] Zichen Liu commented on FLINK-24229: Hey [~Gusev] That's great, thanks for letting me know. Hope you had a good vacation!! I'm going to try to get changes for the base sink though by tomorrow, so as to not have the API move by the time you start work. [https://github.com/apache/flink/pull/17687] I'm close to getting it in as the reviews for this change have mostly taken place on a separate PR page. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439185#comment-17439185 ] Zichen Liu commented on FLINK-24229: Hi [~Gusev] Sadly it may be possible that I won't be able to get the PR merged today... sorry about that. If you need a reference for what the changes will look like (mostly minor), please use the file changes in the PR as a guide. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441623#comment-17441623 ] Zichen Liu commented on FLINK-24229: Hi [~Gusev] Sorry for the delay, everything related to the base sink has been merged. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442731#comment-17442731 ] Zichen Liu commented on FLINK-24229: Hey [~nir.tsruya] Glad to hear! I guess users can currently customize fatal exception behaviour in the async callback of `submitRequestEntries`, but this improvement would refactor out this handling into a separate method. I guess this is the main advantage you're interested in. Also, it would give the user the possibility of handling these in the mailbox thread (though I believe the behaviour would end up being the same). > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24904) Add documentation for KDS Async Sink
Zichen Liu created FLINK-24904: -- Summary: Add documentation for KDS Async Sink Key: FLINK-24904 URL: https://issues.apache.org/jira/browse/FLINK-24904 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24904: --- Description: h2. Motivation *Scope:* * With FLINK- h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > Add documentation for KDS Async Sink > > > Key: FLINK-24904 > URL: https://issues.apache.org/jira/browse/FLINK-24904 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *Scope:* > * With FLINK- > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24904: --- Description: h2. Motivation *Scope:* * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the existing one based on KPL. * Deprecate the current section in the docs for the Kinesis KPL sink and write documentation and usage guide for the new sink. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *Scope:* * With FLINK- h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > Add documentation for KDS Async Sink > > > Key: FLINK-24904 > URL: https://issues.apache.org/jira/browse/FLINK-24904 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *Scope:* > * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes > the existing one based on KPL. > * Deprecate the current section in the docs for the Kinesis KPL sink and > write documentation and usage guide for the new sink. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24904: --- Description: h2. Motivation _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the existing one based on KPL._ *Scope:* * Deprecate the current section in the docs for the Kinesis KPL sink and write documentation and usage guide for the new sink. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *Scope:* * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the existing one based on KPL. * Deprecate the current section in the docs for the Kinesis KPL sink and write documentation and usage guide for the new sink. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > Add documentation for KDS Async Sink > > > Key: FLINK-24904 > URL: https://issues.apache.org/jira/browse/FLINK-24904 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes > the existing one based on KPL._ > *Scope:* > * Deprecate the current section in the docs for the Kinesis KPL sink and > write documentation and usage guide for the new sink. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24904: --- Component/s: Connectors / Kinesis (was: Connectors / Common) > Add documentation for KDS Async Sink > > > Key: FLINK-24904 > URL: https://issues.apache.org/jira/browse/FLINK-24904 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes > the existing one based on KPL._ > *Scope:* > * Deprecate the current section in the docs for the Kinesis KPL sink and > write documentation and usage guide for the new sink. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24905) KDS implementation of Async Sink Table API
Zichen Liu created FLINK-24905: -- Summary: KDS implementation of Async Sink Table API Key: FLINK-24905 URL: https://issues.apache.org/jira/browse/FLINK-24905 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24905) KDS implementation of Async Sink Table API
[ https://issues.apache.org/jira/browse/FLINK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24905: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: There will be additional work later to move these implementations somewhere else. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{{}KinesisTableApiITCase{}}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > KDS implementation of Async Sink Table API > -- > > Key: FLINK-24905 > URL: https://issues.apache.org/jira/browse/FLINK-24905 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use the Table API for the new Kinesis Data > Streams sink. > *Scope:* > * Implement an asynchronous sink for Kinesis Data Streams (KDS) by > inheriting the AsyncSinkBase class. The implementation can for now reside in > its own module in flink-connectors. The module and package name can be > anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name > and {{org.apache.flink.connector.aws.kinesis}} for the package name. > Side-note: There will be additional work later to move these implementations > somewhere else. > * The implementation must allow users to configure the Kinesis Client, with > reasonable default settings. > * Implement an asynchornous sink writer for KDS by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to KDS for increased throughput. The implemented Sink Writer will be > used by the Sink class th
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448591#comment-17448591 ] Zichen Liu commented on FLINK-24229: hey [~Gusev] Hope you're well. Was wondering how the DynamoDB sink was going and if you wanted to discuss anything about it? > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25944) Intermittent Failures on KDF AZP
[ https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25944: --- Priority: Critical (was: Major) > Intermittent Failures on KDF AZP > > > Key: FLINK-25944 > URL: https://issues.apache.org/jira/browse/FLINK-25944 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and > issue is not reproducible locally. Furthermore the failure is intermittent, > frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as > protocol. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25924) KDF Integration tests intermittently fails
[ https://issues.apache.org/jira/browse/FLINK-25924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488002#comment-17488002 ] Zichen Liu commented on FLINK-25924: Superseded by FLINK-25944 > KDF Integration tests intermittently fails > -- > > Key: FLINK-25924 > URL: https://issues.apache.org/jira/browse/FLINK-25924 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.15.0 > > > Intermittent failures introduced as part of merge (PR#18314: > [FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis > Firehose|https://github.com/apache/flink/pull/18314]): > # Failures are intermittent and affecting c. 1 in 7 of builds- on > {{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} . > # The issue looks identical to the KinesaliteContainer startup issue > (Appendix 1). > # I have managed to reproduce the issue locally - if I start some parallel > containers and keep them running - and then run {{KinesisFirehoseSinkITCase}} > then c. 1 in 6 gives the error. > # The errors have a slightly different appearance on > {{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same > appearance as local. I only hope it is a difference in logging/killing > environment variables. (and that there aren’t 2 distinct issues) > Appendix 1: > {code:java} > org.testcontainers.containers.ContainerLaunchException: Container startup > failed > at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336) > at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317) > at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) > at > ... 11 more > Caused by: org.testcontainers.containers.ContainerLaunchException: Could not > create/start container > at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525) > at > org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331) > at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) > ... 12 more > Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result > with exception > at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54) > at > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25924) KDF Integration tests intermittently fails
[ https://issues.apache.org/jira/browse/FLINK-25924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25924. -- Release Note: Superseded by FLINK-25944 Resolution: Duplicate > KDF Integration tests intermittently fails > -- > > Key: FLINK-25924 > URL: https://issues.apache.org/jira/browse/FLINK-25924 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.15.0 > > > Intermittent failures introduced as part of merge (PR#18314: > [FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis > Firehose|https://github.com/apache/flink/pull/18314]): > # Failures are intermittent and affecting c. 1 in 7 of builds- on > {{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} . > # The issue looks identical to the KinesaliteContainer startup issue > (Appendix 1). > # I have managed to reproduce the issue locally - if I start some parallel > containers and keep them running - and then run {{KinesisFirehoseSinkITCase}} > then c. 1 in 6 gives the error. > # The errors have a slightly different appearance on > {{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same > appearance as local. I only hope it is a difference in logging/killing > environment variables. (and that there aren’t 2 distinct issues) > Appendix 1: > {code:java} > org.testcontainers.containers.ContainerLaunchException: Container startup > failed > at > org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336) > at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317) > at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) > at > ... 11 more > Caused by: org.testcontainers.containers.ContainerLaunchException: Could not > create/start container > at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525) > at > org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331) > at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) > ... 12 more > Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result > with exception > at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54) > at > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25944) Intermittent Failures on KDF AZP
[ https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488003#comment-17488003 ] Zichen Liu commented on FLINK-25944: Fix is coming out with https://github.com/apache/flink/pull/18553 > Intermittent Failures on KDF AZP > > > Key: FLINK-25944 > URL: https://issues.apache.org/jira/browse/FLINK-25944 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and > issue is not reproducible locally. Furthermore the failure is intermittent, > frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as > protocol. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
Zichen Liu created FLINK-25976: -- Summary: Update the KDS and KDF Sink's defaults & update the docs Key: FLINK-25976 URL: https://issues.apache.org/jira/browse/FLINK-25976 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Priority: Minor (was: Major) > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Description: h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. was: h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Issue Type: Improvement (was: Bug) > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks
Zichen Liu created FLINK-25977: -- Summary: Close sink client and sink http client for KDS/KDF Sinks Key: FLINK-25977 URL: https://issues.apache.org/jira/browse/FLINK-25977 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Description: h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. was: h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Minor > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25945) Intermittent Failures on KDF AZP 2
[ https://issues.apache.org/jira/browse/FLINK-25945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25945: --- Priority: Blocker (was: Major) > Intermittent Failures on KDF AZP 2 > -- > > Key: FLINK-25945 > URL: https://issues.apache.org/jira/browse/FLINK-25945 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [PR#18606|https://github.com/apache/flink/pull/18606] - > unrelated to (1), this issue reads Unable to execute HTTP request: Trying to > access closed classloader and seems unrelated to the code change (effectively > a 1 line Thread.sleep ). This means it is an intermittent issue affecting the > KDF feature more widely, and has not yet been picked up by the 20+ runs on > the ci. Not reproducible locally. Intermittently failing on ci, frequency > unknown, likely more than 20 : 1. Resolution: unknown. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25944) Intermittent Failures on KDF AZP
[ https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25944: --- Priority: Blocker (was: Critical) > Intermittent Failures on KDF AZP > > > Key: FLINK-25944 > URL: https://issues.apache.org/jira/browse/FLINK-25944 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and > issue is not reproducible locally. Furthermore the failure is intermittent, > frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as > protocol. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25945) Intermittent Failures on KDF AZP 2
[ https://issues.apache.org/jira/browse/FLINK-25945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25945: --- Issue Type: Bug (was: New Feature) > Intermittent Failures on KDF AZP 2 > -- > > Key: FLINK-25945 > URL: https://issues.apache.org/jira/browse/FLINK-25945 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [PR#18606|https://github.com/apache/flink/pull/18606] - > unrelated to (1), this issue reads Unable to execute HTTP request: Trying to > access closed classloader and seems unrelated to the code change (effectively > a 1 line Thread.sleep ). This means it is an intermittent issue affecting the > KDF feature more widely, and has not yet been picked up by the 20+ runs on > the ci. Not reproducible locally. Intermittently failing on ci, frequency > unknown, likely more than 20 : 1. Resolution: unknown. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Priority: Major (was: Minor) > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
[ https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25976: --- Issue Type: Bug (was: Improvement) > Update the KDS and KDF Sink's defaults & update the docs > > > Key: FLINK-25976 > URL: https://issues.apache.org/jira/browse/FLINK-25976 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Minor > Labels: pull-request-available > > h2. Update: > DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 > > to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks
[ https://issues.apache.org/jira/browse/FLINK-25977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25977. -- Release Note: Superseded by FLINK-26006 Resolution: Duplicate > Close sink client and sink http client for KDS/KDF Sinks > > > Key: FLINK-25977 > URL: https://issues.apache.org/jira/browse/FLINK-25977 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Critical > Fix For: 1.15.0 > > > We are not closing the AWS SDK and HTTP clients for the new KDS/KDF async > sink. The close method needs to be invoked when operator stops. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26008) [FLIP-171] Update Kinesalite docker container reference
[ https://issues.apache.org/jira/browse/FLINK-26008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488970#comment-17488970 ] Zichen Liu commented on FLINK-26008: I'm afraid it seems latest is the only tag for this image... [https://hub.docker.com/r/instructure/kinesalite/tags] Other popular Kinesalite images also have only 1 tag. Any suggestions? > [FLIP-171] Update Kinesalite docker container reference > > > Key: FLINK-26008 > URL: https://issues.apache.org/jira/browse/FLINK-26008 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Danny Cranmer >Assignee: Zichen Liu >Priority: Major > Fix For: 1.15.0 > > > We are currently referencing {{:latest}} Kinesalite image for tests. Update > this to the more recent version to avoid updates failing tests: > - https://github.com/apache/flink/pull/18661/files#r801531468 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-24227. -- Resolution: Fixed Feature implemented, test coverage done and docs written. Only bugfixes remaining as of 2022-02-14. > [FLIP-171] KDS implementation of Async Sink > --- > > Key: FLINK-24227 > URL: https://issues.apache.org/jira/browse/FLINK-24227 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use Kinesis Data Streams as sink for my data > pipeline. > *Scope:* > * Implement an asynchronous sink for Kinesis Data Streams (KDS) by > inheriting the AsyncSinkBase class. The implementation can for now reside in > its own module in flink-connectors. The module and package name can be > anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name > and {{org.apache.flink.connector.aws.kinesis}} for the package name. > * The implementation must use [the Kinesis Java > Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. > * The implementation must allow users to configure the Kinesis Client, with > reasonable default settings. > * Implement an asynchornous sink writer for KDS by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to KDS for increased throughput. The implemented Sink Writer will be > used by the Sink class that will be created as part of this story. > * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). > We already use this in {{KinesisTableApiITCase}}. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25944) Intermittent Failures on KDF AZP
[ https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25944. -- Resolution: Duplicate Appearance of this bug is caused by not closing AWS resources properly in FLINK-25846 > Intermittent Failures on KDF AZP > > > Key: FLINK-25944 > URL: https://issues.apache.org/jira/browse/FLINK-25944 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and > issue is not reproducible locally. Furthermore the failure is intermittent, > frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as > protocol. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24370: --- Fix Version/s: (was: 1.15.0) > [FLIP-171] Documentation for Generic AsyncSinkBase > -- > > Key: FLINK-24370 > URL: https://issues.apache.org/jira/browse/FLINK-24370 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Motivation > To write documentation for FLIP-171 Async Sink Base. This will help sink > implementers get acquainted with the necessary information to write their > concrete sinks. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Priority: Minor (was: Major) > Tests for AWS Connectors Using SDK v2 to use Synchronous Clients > > > Key: FLINK-28007 > URL: https://issues.apache.org/jira/browse/FLINK-28007 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > h3. Background > AWS SDK v2 async clients use a Netty async client for Kinesis Data > Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates > a shared thread pool for Netty to use for network operations when one is not > configured. The thread pool is managed by a shared ELG (event loop group), > and this is stored in a static field. We do not configure this for the AWS > connectors in the Flink codebase. > When threads are spawned within the ELG, they inherit the context classloader > from the current thread. If the ELG is created from a shared classloader, for > instance Flink parent classloader, or MiniCluster parent classloader, > multiple Flink jobs can share the same ELG. When an ELG thread is spawned > from a Flink job, it will inherit the Flink user classloader. When this job > completes/fails, the classloader is destroyed, however the Netty thread is > still referencing it, and this leads to below exception. > h3. Impact > This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded > via the Flink User Classloader. It is expected this is the standard > deployment configuration. > This issue is known to impact: > - Flink mini cluster, for example in integration tests (FLINK-26064) > - Flink cluster loading AWS SDK v2 via parent classloader > h3. Suggested solution > There are a few possible solutions, as discussed > https://github.com/apache/flink/pull/18733 > 1. Create a separate ELG per Flink job > 2. Create a separate ELG per subtask > 3. Attach the correct classloader to ELG spawned threads > h3. Error Stack > (shortened stack trace, as full is too large) > {noformat} > Feb 09 20:05:04 java.util.concurrent.ExecutionException: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > (...) > Feb 09 20:05:04 Caused by: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) > (...) > Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access > closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNe
[jira] [Created] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
Zichen Liu created FLINK-28007: -- Summary: Tests for AWS Connectors Using SDK v2 to use Synchronous Clients Key: FLINK-28007 URL: https://issues.apache.org/jira/browse/FLINK-28007 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.16.0 h3. Background AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception. h3. Impact This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration. This issue is known to impact: - Flink mini cluster, for example in integration tests (FLINK-26064) - Flink cluster loading AWS SDK v2 via parent classloader h3. Suggested solution There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733 1. Create a separate ELG per Flink job 2. Create a separate ELG per subtask 3. Attach the correct classloader to ELG spawned threads h3. Error Stack (shortened stack trace, as full is too large) {noformat} Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) (...) Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) (...) Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188) Feb 09 20:05:04 at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348) Feb 09 20:05:04 at java.util.ServiceLoader$La
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Affects Version/s: 1.15.0 1.15.1 1.15.2 > Tests for AWS Connectors Using SDK v2 to use Synchronous Clients > > > Key: FLINK-28007 > URL: https://issues.apache.org/jira/browse/FLINK-28007 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.0, 1.15.1, 1.15.2 >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > h3. Background > AWS SDK v2 async clients use a Netty async client for Kinesis Data > Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates > a shared thread pool for Netty to use for network operations when one is not > configured. The thread pool is managed by a shared ELG (event loop group), > and this is stored in a static field. We do not configure this for the AWS > connectors in the Flink codebase. > When threads are spawned within the ELG, they inherit the context classloader > from the current thread. If the ELG is created from a shared classloader, for > instance Flink parent classloader, or MiniCluster parent classloader, > multiple Flink jobs can share the same ELG. When an ELG thread is spawned > from a Flink job, it will inherit the Flink user classloader. When this job > completes/fails, the classloader is destroyed, however the Netty thread is > still referencing it, and this leads to below exception. > h3. Impact > This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded > via the Flink User Classloader. It is expected this is the standard > deployment configuration. > This issue is known to impact: > - Flink mini cluster, for example in integration tests (FLINK-26064) > - Flink cluster loading AWS SDK v2 via parent classloader > h3. Suggested solution > There are a few possible solutions, as discussed > https://github.com/apache/flink/pull/18733 > 1. Create a separate ELG per Flink job > 2. Create a separate ELG per subtask > 3. Attach the correct classloader to ELG spawned threads > h3. Error Stack > (shortened stack trace, as full is too large) > {noformat} > Feb 09 20:05:04 java.util.concurrent.ExecutionException: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > (...) > Feb 09 20:05:04 Caused by: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) > (...) > Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access > closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > Feb 09 2
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Component/s: Connectors / Common > Tests for AWS Connectors Using SDK v2 to use Synchronous Clients > > > Key: FLINK-28007 > URL: https://issues.apache.org/jira/browse/FLINK-28007 > Project: Flink > Issue Type: Bug > Components: Connectors / Common, Connectors / Kinesis >Affects Versions: 1.15.0, 1.15.1, 1.15.2 >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > h3. Background > AWS SDK v2 async clients use a Netty async client for Kinesis Data > Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates > a shared thread pool for Netty to use for network operations when one is not > configured. The thread pool is managed by a shared ELG (event loop group), > and this is stored in a static field. We do not configure this for the AWS > connectors in the Flink codebase. > When threads are spawned within the ELG, they inherit the context classloader > from the current thread. If the ELG is created from a shared classloader, for > instance Flink parent classloader, or MiniCluster parent classloader, > multiple Flink jobs can share the same ELG. When an ELG thread is spawned > from a Flink job, it will inherit the Flink user classloader. When this job > completes/fails, the classloader is destroyed, however the Netty thread is > still referencing it, and this leads to below exception. > h3. Impact > This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded > via the Flink User Classloader. It is expected this is the standard > deployment configuration. > This issue is known to impact: > - Flink mini cluster, for example in integration tests (FLINK-26064) > - Flink cluster loading AWS SDK v2 via parent classloader > h3. Suggested solution > There are a few possible solutions, as discussed > https://github.com/apache/flink/pull/18733 > 1. Create a separate ELG per Flink job > 2. Create a separate ELG per subtask > 3. Attach the correct classloader to ELG spawned threads > h3. Error Stack > (shortened stack trace, as full is too large) > {noformat} > Feb 09 20:05:04 java.util.concurrent.ExecutionException: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > (...) > Feb 09 20:05:04 Caused by: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) > (...) > Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access > closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > org.apache.f
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Labels: (was: pull-request-available) > Tests for AWS Connectors Using SDK v2 to use Synchronous Clients > > > Key: FLINK-28007 > URL: https://issues.apache.org/jira/browse/FLINK-28007 > Project: Flink > Issue Type: Bug > Components: Connectors / Common, Connectors / Kinesis >Affects Versions: 1.15.0, 1.15.1, 1.15.2 >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Minor > Fix For: 1.16.0 > > > h3. Background > AWS SDK v2 async clients use a Netty async client for Kinesis Data > Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates > a shared thread pool for Netty to use for network operations when one is not > configured. The thread pool is managed by a shared ELG (event loop group), > and this is stored in a static field. We do not configure this for the AWS > connectors in the Flink codebase. > When threads are spawned within the ELG, they inherit the context classloader > from the current thread. If the ELG is created from a shared classloader, for > instance Flink parent classloader, or MiniCluster parent classloader, > multiple Flink jobs can share the same ELG. When an ELG thread is spawned > from a Flink job, it will inherit the Flink user classloader. When this job > completes/fails, the classloader is destroyed, however the Netty thread is > still referencing it, and this leads to below exception. > h3. Impact > This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded > via the Flink User Classloader. It is expected this is the standard > deployment configuration. > This issue is known to impact: > - Flink mini cluster, for example in integration tests (FLINK-26064) > - Flink cluster loading AWS SDK v2 via parent classloader > h3. Suggested solution > There are a few possible solutions, as discussed > https://github.com/apache/flink/pull/18733 > 1. Create a separate ELG per Flink job > 2. Create a separate ELG per subtask > 3. Attach the correct classloader to ELG spawned threads > h3. Error Stack > (shortened stack trace, as full is too large) > {noformat} > Feb 09 20:05:04 java.util.concurrent.ExecutionException: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 09 20:05:04 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > (...) > Feb 09 20:05:04 Caused by: > software.amazon.awssdk.core.exception.SdkClientException: Unable to execute > HTTP request: Trying to access closed classloader. Please check if you store > classloaders directly or indirectly in static fields. If the stacktrace > suggests that the leak occurs in a third party library and cannot be fixed > immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) > Feb 09 20:05:04 at > software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) > Feb 09 20:05:04 at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) > (...) > Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access > closed classloader. Please check if you store classloaders directly or > indirectly in static fields. If the stacktrace suggests that the leak occurs > in a third party library and cannot be fixed immediately, you can disable > this check with the configuration 'classloader.check-leaked-classloader'. > Feb 09 20:05:04 at > org.apache.flink.runtime.execution.librarycache.Fli
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Description: h3. Background The unit & integration tests for the aws connectors in the Flink repository create clients using static helper methods in flink-connector-aws-base, in the AWSServicesTestUtils class. These static helper methods create the asynchronous flavour of the clients required by aws connectors. *Task* * Change these to the synchronous version for each aws client. was: h3. Background AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception. h3. Impact This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration. This issue is known to impact: - Flink mini cluster, for example in integration tests (FLINK-26064) - Flink cluster loading AWS SDK v2 via parent classloader h3. Suggested solution There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733 1. Create a separate ELG per Flink job 2. Create a separate ELG per subtask 3. Attach the correct classloader to ELG spawned threads h3. Error Stack (shortened stack trace, as full is too large) {noformat} Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) (...) Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) (...) Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188) Feb 09 20:05:04
[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
[ https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28007: --- Labels: pull-request-available (was: ) > Tests for AWS Connectors Using SDK v2 to use Synchronous Clients > > > Key: FLINK-28007 > URL: https://issues.apache.org/jira/browse/FLINK-28007 > Project: Flink > Issue Type: Bug > Components: Connectors / Common, Connectors / Kinesis >Affects Versions: 1.15.0, 1.15.1, 1.15.2 >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > h3. Background > The unit & integration tests for the aws connectors in the Flink repository > create clients using static helper methods in flink-connector-aws-base, in > the AWSServicesTestUtils class. > These static helper methods create the asynchronous flavour of the clients > required by aws connectors. > *Task* > * Change these to the synchronous version for each aws client. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy
Zichen Liu created FLINK-28027: -- Summary: Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy Key: FLINK-28027 URL: https://issues.apache.org/jira/browse/FLINK-28027 Project: Flink Issue Type: Bug Components: Connectors / Common, Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.16.0 *Background* In the AsyncSinkWriter, we implement a rate limiting strategy. The initial value for the maximum number of in flight messages is set extremely high ({{{}maxBatchSize * maxInFlightRequests{}}}). However, in accordance with the AIMD strategy, the TCP implementation for congestion control has found a small value to start with [is better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]). *Suggestion* A better default might be: * maxBatchSize * maxBatchSize / parallelism -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27670) Python wrappers for Kinesis Sinks
[ https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-27670. -- Resolution: Duplicate Completed in FLINK-21966 > Python wrappers for Kinesis Sinks > - > > Key: FLINK-27670 > URL: https://issues.apache.org/jira/browse/FLINK-27670 > Project: Flink > Issue Type: Improvement > Components: API / Python, Connectors / Kinesis >Reporter: Zichen Liu >Priority: Major > Fix For: 1.16.0 > > > Create Python Wrappers for the new Kinesis Streams sink and the Kinesis > Firehose sink. > An example implementation may be found here > [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28148) Unable to load jar connector to a Python Table API app
Zichen Liu created FLINK-28148: -- Summary: Unable to load jar connector to a Python Table API app Key: FLINK-28148 URL: https://issues.apache.org/jira/browse/FLINK-28148 Project: Flink Issue Type: Bug Components: API / Python, Connectors / Common, Table SQL / API Affects Versions: 1.16.0 Reporter: Zichen Liu Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/) to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). was: Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/) to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). was: Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Labels: connector jar python table-api (was: ) > Unable to load jar connector to a Python Table API app > -- > > Key: FLINK-28148 > URL: https://issues.apache.org/jira/browse/FLINK-28148 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Common, Table SQL / API >Affects Versions: 1.16.0 >Reporter: Zichen Liu >Priority: Major > Labels: connector, jar, python, table-api > > Reproduction steps: > # Clone the latest Flink from the master branch. > # Follow the Flink [recommended > steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] > to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, > Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. > # Create a new Python Table API app that loads in a jar, similar to: > > {code:java} > from pyflink.table import TableEnvironment, StreamTableEnvironment, > EnvironmentSettings > env_settings = EnvironmentSettings.in_streaming_mode() > t_env = StreamTableEnvironment.create(environment_settings=env_settings) > t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") > {code} > The jar loaded here can be any jar, and the following message will appear: > > {code:java} > Traceback (most recent call last): > File "pyflink_table_api_firehose.py", line 48, in > log_processing() > File "pyflink_table_api_firehose.py", line 14, in log_processing > t_env.get_config().set("pipeline.classpaths", > "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", > line 109, in set > add_jars_to_context_class_loader(value.split(";")) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", > line 169, in add_jars_to_context_class_loader > addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. > : java.lang.IllegalArgumentException: object is not an instance of declaring > class >at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) >at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.base/java.lang.reflect.Method.invoke(Method.java:566) >at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) >at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.base/java.lang.reflect.Method.invoke(Method.java:566) >at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >at java.base/java.lang.Thread.run(Thread.java:829) {code} > Reproduced on Mac and Amazon Linux 2. > Next do: > {code:java} > pip uninstall apache-flink > pip install apache-flink{code} > To downgrade it to 1.15 release. > The loading of the jar should be successful. Even if you try to load the same > connector built from master (reproduced with Kafka, Kinesis Firehose). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). was: Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The following alternative ways of loading jars produce a similar issue: {code:java} table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). was: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {co
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The following alternative way of loading jars produce a similar issue: {code:java} table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). was: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {cod
[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-28148: --- Description: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps * Clone the latest Flink from the master branch. * Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. * Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} * The following alternative way of loading jars produce a similar issue: {code:java} table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///path/to/your/jar.jar") {code} * The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} * Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} ...to downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). Reproduced on Mac and Amazon Linux 2. was: h2. Background User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943. h2. Reproduction steps # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, sim
[jira] [Commented] (FLINK-28148) Unable to load jar connector to a Python Table API app
[ https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559081#comment-17559081 ] Zichen Liu commented on FLINK-28148: [~dianfu] [~hxbks2ks] Apologies for the late reply, I was off for a few days. Thank you making the fix! This is a duplicate of FLINK-28002. > Unable to load jar connector to a Python Table API app > -- > > Key: FLINK-28148 > URL: https://issues.apache.org/jira/browse/FLINK-28148 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Common, Table SQL / API >Affects Versions: 1.16.0 >Reporter: Zichen Liu >Priority: Major > Labels: connector, jar, python, table-api > > h2. Background > User currently unable to build & install the latest PyFlink and then load > jars. The jar loading mechanism was introduced in FLINK-16943. > h2. Reproduction steps > * Clone the latest Flink from the master branch. > * Follow the Flink [recommended > steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/] > to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, > Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7. > * Create a new Python Table API app that loads in a jar, similar to: > {code:java} > from pyflink.table import TableEnvironment, StreamTableEnvironment, > EnvironmentSettings > env_settings = EnvironmentSettings.in_streaming_mode() > t_env = StreamTableEnvironment.create(environment_settings=env_settings) > t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") > {code} > > * The following alternative way of loading jars produce a similar issue: > {code:java} > table_env.get_config().get_configuration().set_string("pipeline.jars", > "file:///path/to/your/jar.jar") {code} > > * The jar loaded here can be any jar, and the following message will appear: > {code:java} > Traceback (most recent call last): > File "pyflink_table_api_firehose.py", line 48, in > log_processing() > File "pyflink_table_api_firehose.py", line 14, in log_processing > t_env.get_config().set("pipeline.classpaths", > "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", > line 109, in set > add_jars_to_context_class_loader(value.split(";")) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", > line 169, in add_jars_to_context_class_loader > addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. > : java.lang.IllegalArgumentException: object is not an instance of declaring > class >at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) >at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.base/java.lang.reflect.Method.invoke(Method.java:566) >at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) >at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.base/java.lang.reflect.Method.invoke(Method.java:566) >at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >at java.base/java.lang.Thread.run(Thread.java:829) {code} > > * Next do:
[jira] [Commented] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-25735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559107#comment-17559107 ] Zichen Liu commented on FLINK-25735: Hi Sorry for the late response, I have been off! I do not have permission to assign to someone, but I will find someone to assign it to you. Thank you for offering! > Chinese Translation - Add documentation for KDS Async Sink > -- > > Key: FLINK-25735 > URL: https://issues.apache.org/jira/browse/FLINK-25735 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > Labels: chinese-translation > > h2. Translate: > * Connectors/Kinesis page to deprecate old sink > (docs/content/docs/connectors/datastream/kinesis.md) > * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - > only the section about Kinesis > into Chinese. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-25735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559121#comment-17559121 ] Zichen Liu commented on FLINK-25735: Apologies again for the delay. Would [~martijnvisser] mind perform the assignment? > Chinese Translation - Add documentation for KDS Async Sink > -- > > Key: FLINK-25735 > URL: https://issues.apache.org/jira/browse/FLINK-25735 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > Labels: chinese-translation > > h2. Translate: > * Connectors/Kinesis page to deprecate old sink > (docs/content/docs/connectors/datastream/kinesis.md) > * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - > only the section about Kinesis > into Chinese. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-25859) Add documentation for DynamoDB Async Sink
[ https://issues.apache.org/jira/browse/FLINK-25859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17583106#comment-17583106 ] Zichen Liu commented on FLINK-25859: Hi [~Gusev], thanks again for contributing the DynamoDB sink, would you like to pick this one up, as it goes along with that work? I believe I am assigned to this task due to cloning of jiras, but, if you are happy to pick this up, a committer ( maybe . [~dannycranmer] ) will be happy to assign it to you. > Add documentation for DynamoDB Async Sink > - > > Key: FLINK-25859 > URL: https://issues.apache.org/jira/browse/FLINK-25859 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis, Documentation >Reporter: Yuri Gusev >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Motivation > FLINK-24229 _introduces a new sink for DynamoDB_ > *Scope:* > * Create documentation for the new connector > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966 ] Zichen Liu commented on FLINK-24370: Done in linked PR#517 > [FLIP-171] Documentation for Generic AsyncSinkBase > -- > > Key: FLINK-24370 > URL: https://issues.apache.org/jira/browse/FLINK-24370 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Motivation > To write documentation for FLIP-171 Async Sink Base. This will help sink > implementers get acquainted with the necessary information to write their > concrete sinks. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966 ] Zichen Liu edited comment on FLINK-24370 at 7/20/22 10:09 AM: -- Done in linked PR#517 Please complete this and FLINK-24278 projects [~chesnay] was (Author: crynetlogistics): Done in linked PR#517 > [FLIP-171] Documentation for Generic AsyncSinkBase > -- > > Key: FLINK-24370 > URL: https://issues.apache.org/jira/browse/FLINK-24370 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Motivation > To write documentation for FLIP-171 Async Sink Base. This will help sink > implementers get acquainted with the necessary information to write their > concrete sinks. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966 ] Zichen Liu edited comment on FLINK-24370 at 7/20/22 10:20 AM: -- Done in linked PR#517 Please mark this ticket and FLINK-24278 as Resolved [~chesnay] was (Author: crynetlogistics): Done in linked PR#517 Please complete this and FLINK-24278 projects [~chesnay] > [FLIP-171] Documentation for Generic AsyncSinkBase > -- > > Key: FLINK-24370 > URL: https://issues.apache.org/jira/browse/FLINK-24370 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available, stale-assigned > > h2. Motivation > To write documentation for FLIP-171 Async Sink Base. This will help sink > implementers get acquainted with the necessary information to write their > concrete sinks. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable
[ https://issues.apache.org/jira/browse/FLINK-27537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569349#comment-17569349 ] Zichen Liu commented on FLINK-27537: Hello [~DavidLiu001] Thank you for offering. I'm delighted you want to make this change. [~dannycranmer] would you mind assigning David to this jira? > Remove requirement for Async Sink's RequestEntryT to be serializable > > > Key: FLINK-27537 > URL: https://issues.apache.org/jira/browse/FLINK-27537 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > > Currently, in AsyncSinkBase and it's dependent classes, e.g. the sink writer, > element converter etc., the RequestEntryT generic type is required to be > serializable. > However, this requirement no longer holds and there is nothing that actually > requires this. > Proposed approach: > * Remove the extends serializable from the generic type RequestEntryT -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27536) Rename method parameter in AsyncSinkWriter
[ https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569350#comment-17569350 ] Zichen Liu commented on FLINK-27536: Hi [~DavidLiu001], awesome. [~dannycranmer] would you mind assigning please? > Rename method parameter in AsyncSinkWriter > -- > > Key: FLINK-27536 > URL: https://issues.apache.org/jira/browse/FLINK-27536 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Minor > > Change the abstract method's parameter naming in AsyncSinkWriter. > From > Consumer> requestResult > to > Consumer> requestToRetry > or something similar. > > This is because the consumer here is supposed to accept a list of requests > that need to be retried. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
[ https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561036#comment-17561036 ] Zichen Liu commented on FLINK-28169: I have been coordinating with [~chalixar] in the background and I have found a fix and can push a PR, please reassign to me ([~chalixar] please confirm you are happy with this) > GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError > --- > > Key: FLINK-28169 > URL: https://issues.apache.org/jira/browse/FLINK-28169 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > Jun 21 03:06:27 Caused by: > org.testcontainers.containers.ContainerLaunchException: Could not > create/start container > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) > Jun 21 03:06:27 ... 8 more > Jun 21 03:06:27 Caused by: java.lang.RuntimeException: > java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68) > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146) > Jun 21 03:06:27 at > org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480) > Jun 21 03:06:27 ... 10 more > Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157) > Jun 21 03:06:27 at > org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43) > Jun 21 03:06:27 at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > Jun 21 03:06:27 ... 1 more > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
[ https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561112#comment-17561112 ] Zichen Liu commented on FLINK-28169: Thanks for assigning it to me [~martijnvisser] , I have pushed the PR and passed CI. Thanks [~chalixar] for approving and creating the related follow ups FLINK-28332 and FLINK-28333. > GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError > --- > > Key: FLINK-28169 > URL: https://issues.apache.org/jira/browse/FLINK-28169 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Assignee: Zichen Liu >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > Jun 21 03:06:27 Caused by: > org.testcontainers.containers.ContainerLaunchException: Could not > create/start container > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) > Jun 21 03:06:27 ... 8 more > Jun 21 03:06:27 Caused by: java.lang.RuntimeException: > java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68) > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146) > Jun 21 03:06:27 at > org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480) > Jun 21 03:06:27 ... 10 more > Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157) > Jun 21 03:06:27 at > org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43) > Jun 21 03:06:27 at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > Jun 21 03:06:27 ... 1 more > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
[ https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561431#comment-17561431 ] Zichen Liu commented on FLINK-28169: Thanks [~martijnvisser] much appreciated. > GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError > --- > > Key: FLINK-28169 > URL: https://issues.apache.org/jira/browse/FLINK-28169 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Assignee: Zichen Liu >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.0 > > > {code:java} > Jun 21 03:06:27 Caused by: > org.testcontainers.containers.ContainerLaunchException: Could not > create/start container > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) > Jun 21 03:06:27 ... 8 more > Jun 21 03:06:27 Caused by: java.lang.RuntimeException: > java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68) > Jun 21 03:06:27 at > org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146) > Jun 21 03:06:27 at > org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926) > Jun 21 03:06:27 at > org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480) > Jun 21 03:06:27 ... 10 more > Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: > 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient > org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()' > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157) > Jun 21 03:06:27 at > org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51) > Jun 21 03:06:27 at > org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153) > Jun 21 03:06:27 at > org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43) > Jun 21 03:06:27 at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > Jun 21 03:06:27 at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > Jun 21 03:06:27 ... 1 more > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x
Zichen Liu created FLINK-25729: -- Summary: Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x Key: FLINK-25729 URL: https://issues.apache.org/jira/browse/FLINK-25729 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use configure a custom fatal exception handler to the AsyncSinkWriter *Scope:* * Create a new fatal exception handler * Users of the AsyncSink should be able to create an implementation and pass it to the sink -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x
[ https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25729: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for Java 2.x rather than the Kinesis Consumer Library. * Maintain all the features of the current consumer at `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the new source * The new sink should live in the module `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`) was: h2. Motivation *User stories:* As a Flink user, I’d like to use configure a custom fatal exception handler to the AsyncSinkWriter *Scope:* * Create a new fatal exception handler * Users of the AsyncSink should be able to create an implementation and pass it to the sink > Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on > the AWS SDK for Java 2.x > - > > Key: FLINK-25729 > URL: https://issues.apache.org/jira/browse/FLINK-25729 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for > Java 2.x rather than the Kinesis Consumer Library. > > * Maintain all the features of the current consumer at > `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the > new source > * The new sink should live in the module > `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x
[ https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25729: --- Component/s: Connectors / Kinesis (was: Connectors / Common) > Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on > the AWS SDK for Java 2.x > - > > Key: FLINK-25729 > URL: https://issues.apache.org/jira/browse/FLINK-25729 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for > Java 2.x rather than the Kinesis Consumer Library. > > * Maintain all the features of the current consumer at > `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the > new source > * The new sink should live in the module > `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
Zichen Liu created FLINK-25731: -- Summary: Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated Key: FLINK-25731 URL: https://issues.apache.org/jira/browse/FLINK-25731 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. * Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
[ https://issues.apache.org/jira/browse/FLINK-25731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25731: --- Description: This consumer based on the Kinesis Consumer Library (KCL) has been deprecated, a Jira to create an equivalent consumer based on AWS SDK for Java 2.x has been raised in FLINK-25729 and planned for Flink 1.16. This producer based on the Kinesis Producer Library KPL has been superseded. The new sink can be found in the module flink-connectors/flink-connector-aws-kinesis-data-streams and package org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based on the AWS SDK for Java 2.x. The work to replace this sink was carried out in FLINK-24227. was: h2. Motivation *User stories:* As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. * Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated > > > Key: FLINK-25731 > URL: https://issues.apache.org/jira/browse/FLINK-25731 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > This consumer based on the Kinesis Consumer Library (KCL) has been > deprecated, a Jira to create an equivalent consumer based on AWS SDK for Java > 2.x has been raised in FLINK-25729 and planned for Flink 1.16. > This producer based on the Kinesis Producer Library KPL has been superseded. > The new sink can be found in the module > flink-connectors/flink-connector-aws-kinesis-data-streams and package > org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based > on the AWS SDK for Java 2.x. The work to replace this sink was carried out in > FLINK-24227. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink
Zichen Liu created FLINK-25735: -- Summary: Chinese Translation - Add documentation for KDS Async Sink Key: FLINK-25735 URL: https://issues.apache.org/jira/browse/FLINK-25735 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content/docs/connectors/datastream/kinesis.md) * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only the section about Kinesis into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25408) Chinese Translation - Add documentation for KDS Async Sink
[ https://issues.apache.org/jira/browse/FLINK-25408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25408. -- Resolution: Duplicate Duplicate of https://issues.apache.org/jira/browse/FLINK-25735 > Chinese Translation - Add documentation for KDS Async Sink > -- > > Key: FLINK-25408 > URL: https://issues.apache.org/jira/browse/FLINK-25408 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis, Documentation >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Fix For: 1.15.0 > > > h2. Translate: > * Connectors/Kinesis page to deprecate old sink > (docs/content/docs/connectors/datastream/kinesis.md) > * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) > into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25736) Chinese Translation - Update documentation for Kinesis Table Api
Zichen Liu created FLINK-25736: -- Summary: Chinese Translation - Update documentation for Kinesis Table Api Key: FLINK-25736 URL: https://issues.apache.org/jira/browse/FLINK-25736 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content.zh/docs/connectors/table/kinesis.md) into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink
Zichen Liu created FLINK-25737: -- Summary: Chinese Translation - Add documentation for Firehose Async Sink Key: FLINK-25737 URL: https://issues.apache.org/jira/browse/FLINK-25737 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content/docs/connectors/datastream/kinesis.md) * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only the section about Kinesis into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink
[ https://issues.apache.org/jira/browse/FLINK-25737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25737: --- Description: h2. Translate: * Connectors/Firehose page (docs/content.zh/docs/connectors/datastream/firehose.md) into Chinese. was: h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content/docs/connectors/datastream/kinesis.md) * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only the section about Kinesis into Chinese. > Chinese Translation - Add documentation for Firehose Async Sink > --- > > Key: FLINK-25737 > URL: https://issues.apache.org/jira/browse/FLINK-25737 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Major > Fix For: 1.15.0 > > > h2. Translate: > * Connectors/Firehose page > (docs/content.zh/docs/connectors/datastream/firehose.md) > into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty
Zichen Liu created FLINK-25792: -- Summary: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty Key: FLINK-25792 URL: https://issues.apache.org/jira/browse/FLINK-25792 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty
[ https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25792: --- Description: h2. Bug: was: h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > Async Sink Base is too being flushed too frequently resulting in backpressure > even when buffer is near empty > > > Key: FLINK-25792 > URL: https://issues.apache.org/jira/browse/FLINK-25792 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty
[ https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25792: --- Description: h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. was: h2. Bug: > Async Sink Base is too being flushed too frequently resulting in backpressure > even when buffer is near empty > > > Key: FLINK-25792 > URL: https://issues.apache.org/jira/browse/FLINK-25792 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > Async Sink Base is too being flushed too frequently resulting in backpressure > even when buffer is near empty > *Cause:* > During a write(), flushIfAble() is called, which checks if the number of > buffered elements is greater than a batch size, and if so, insists that the > sink flushes immediately, even if the number of inFlightRequests is greater > than the maximum allowed number of inFlightRequests, resulting in a yield of > the current mailbox thread, and hence blocks. > Notice that this can occur even if the buffer is near empty, so the blocking > behaviour is unnecessary and undesirable, since we would like the element to > be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty
[ https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25792: --- Issue Type: Bug (was: New Feature) > Async Sink Base is too being flushed too frequently resulting in backpressure > even when buffer is near empty > > > Key: FLINK-25792 > URL: https://issues.apache.org/jira/browse/FLINK-25792 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > Async Sink Base is too being flushed too frequently resulting in backpressure > even when buffer is near empty > *Cause:* > During a write(), flushIfAble() is called, which checks if the number of > buffered elements is greater than a batch size, and if so, insists that the > sink flushes immediately, even if the number of inFlightRequests is greater > than the maximum allowed number of inFlightRequests, resulting in a yield of > the current mailbox thread, and hence blocks. > Notice that this can occur even if the buffer is near empty, so the blocking > behaviour is unnecessary and undesirable, since we would like the element to > be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits
Zichen Liu created FLINK-25793: -- Summary: Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits Key: FLINK-25793 URL: https://issues.apache.org/jira/browse/FLINK-25793 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits
[ https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25793: --- Description: h2. Bug: Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits *Cause:* We are not throttling our requests downstream at all. We should monitor for requests that have failed with ThroughputExceeded exceptions and reduce the throughput accordingly. was: h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. > Kinesis Data Streams sink is not limiting throughput to the destination and > therefore exceeding rate limits > --- > > Key: FLINK-25793 > URL: https://issues.apache.org/jira/browse/FLINK-25793 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > Kinesis Data Streams sink is not limiting throughput to the destination and > therefore exceeding rate limits > *Cause:* > We are not throttling our requests downstream at all. > We should monitor for requests that have failed with ThroughputExceeded > exceptions and reduce the throughput accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481801#comment-17481801 ] Zichen Liu commented on FLINK-24229: Hi [~Gusev] Thank you so much for the contribution. I will be available and ready to help with & review the pull request. Can't wait. I really hope we can make it for the release. > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482452#comment-17482452 ] Zichen Liu commented on FLINK-24229: (y) > [FLIP-171] DynamoDB implementation of Async Sink > > > Key: FLINK-24229 > URL: https://issues.apache.org/jira/browse/FLINK-24229 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > *User stories:* > As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. > *Scope:* > * Implement an asynchronous sink for DynamoDB by inheriting the > AsyncSinkBase class. The implementation can for now reside in its own module > in flink-connectors. > * Implement an asynchornous sink writer for DynamoDB by extending the > AsyncSinkWriter. The implementation must deal with failed requests and retry > them using the {{requeueFailedRequestEntry}} method. If possible, the > implementation should batch multiple requests (PutRecordsRequestEntry > objects) to Firehose for increased throughput. The implemented Sink Writer > will be used by the Sink class that will be created as part of this story. > * Java / code-level docs. > * End to end testing: add tests that hits a real AWS instance. (How to best > donate resources to the Flink project to allow this to happen?) > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
[ https://issues.apache.org/jira/browse/FLINK-25731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25731. -- Resolution: Fixed > Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated > > > Key: FLINK-25731 > URL: https://issues.apache.org/jira/browse/FLINK-25731 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Reporter: Zichen Liu >Assignee: Zichen Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > This consumer based on the Kinesis Consumer Library (KCL) has been > deprecated, a Jira to create an equivalent consumer based on AWS SDK for Java > 2.x has been raised in FLINK-25729 and planned for Flink 1.16. > This producer based on the Kinesis Producer Library KPL has been superseded. > The new sink can be found in the module > flink-connectors/flink-connector-aws-kinesis-data-streams and package > org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based > on the AWS SDK for Java 2.x. The work to replace this sink was carried out in > FLINK-24227. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-24905) [FLIP-171] KDS implementation of Async Sink Table API
[ https://issues.apache.org/jira/browse/FLINK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24905: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] *Update:* ^^ Status Update ^^ __List of all work outstanding for 1.15 release__ [Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs [Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop if not flushing during commit [Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as deprecated (Prod: FLINK-24227) [Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs [Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries in order not in reverse [Merged] https://github.com/apache/flink/pull/18512 - changing failed requests handler to accept List in AsyncSinkWriter [Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element converter [Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data streams sql uber-jar Ready for review: [SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & Docs [BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module names) and KinesisData*Sink to Kinesis*Sink (class names) Pending PR: * Firehose Table API Sink & Docs * KDF Table API SQL jar TBD: * FLINK-25846: Not shutting down * FLINK-25848: Validation during start up * FLINK-25792: flush() bug * FLINK-25793: throughput exceeded * Update the defaults of KDS sink and update the docs + do the same for KDF * add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the `flink-connector-base` and propagate it to the two connectors - feature freeze * KDS performance testing * KDF performance testing * Clone the new docs to .../contents.zh/... and add the location to the corresponding Chinese translation jira - KDS - * Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs (legacy issue) - Flink 1.15 release * KDS end to end sanity test - hits aws apis rather than local docker images * KDS Python wrappers * FLINK-25733 - Create A migration guide for Kinesis Table API connector - can happen after 1.15 * If `endpoint` is provided, `region` should not be required like it currently is * Test if Localstack container requires the 1ms timeout * Adaptive level of logging (in discussion) FYI: * FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - https://github.com/apache/flink/pull/18449 * https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink Chinese translation: https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink was: h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] > [FLIP-171] KDS implementation of Async Sink Table API > - > > Key: FLINK-24905 > URL: https://issues.apache.org/jira/browse/FLINK-
[jira] [Updated] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink
[ https://issues.apache.org/jira/browse/FLINK-24228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-24228: --- Description: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Firehose by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for Firehose by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{{}KinesisTableApiITCase{}}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] *Update:* ^^ Status Update ^^ _{_}List of all work outstanding for 1.15 release{_}_ [Merged] [https://github.com/apache/flink/pull/18165] - KDS DataStream Docs [Merged] [https://github.com/apache/flink/pull/18396] - [hotfix] for infinte loop if not flushing during commit [Merged] [https://github.com/apache/flink/pull/18421] - Mark Kinesis Producer as deprecated (Prod: FLINK-24227) [Merged] [https://github.com/apache/flink/pull/18348] - KDS Table API Sink & Docs [Merged] [https://github.com/apache/flink/pull/18488] - base sink retry entries in order not in reverse [Merged] [https://github.com/apache/flink/pull/18512] - changing failed requests handler to accept List in AsyncSinkWriter [Merged] [https://github.com/apache/flink/pull/18483] - Do not expose the element converter [Merged] [https://github.com/apache/flink/pull/18468] - Adding Kinesis data streams sql uber-jar Ready for review: [SUCCESS ] [https://github.com/apache/flink/pull/18314] - KDF DataStream Sink & Docs [BLOCKED on ^^ ] [https://github.com/apache/flink/pull/18426] - rename flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module names) and KinesisData*Sink to Kinesis*Sink (class names) Pending PR: * Firehose Table API Sink & Docs * KDF Table API SQL jar TBD: * FLINK-25846: Not shutting down * FLINK-25848: Validation during start up * FLINK-25792: flush() bug * FLINK-25793: throughput exceeded * Update the defaults of KDS sink and update the docs + do the same for KDF * add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the `flink-connector-base` and propagate it to the two connectors -- --- - -- --- - -- --- - feature freeze * KDS performance testing * KDF performance testing * Clone the new docs to .../contents.zh/... and add the location to the corresponding Chinese translation jira - KDS - * Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs (legacy issue) -- --- - -- --- - -- --- - Flink 1.15 release * KDS end to end sanity test - hits aws apis rather than local docker images * KDS Python wrappers * FLINK-25733 - Create A migration guide for Kinesis Table API connector - can happen after 1.15 * If `endpoint` is provided, `region` should not be required like it currently is * Test if Localstack container requires the 1ms timeout * Adaptive level of logging (in discussion) FYI: * FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - [https://github.com/apache/flink/pull/18449] * https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink Chinese translation: https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink was: h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for
[jira] [Updated] (FLINK-25793) Introduce rate limiting mechanism to Async Sink Base
[ https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25793: --- Description: h2. Bug: The Async Sink Base sink is not limiting throughput to the destination and therefore exceeding rate limits *Cause:* We are not throttling our requests downstream at all. We should monitor for requests that have failed with ThroughputExceeded exceptions and reduce the throughput accordingly. was: h2. Bug: Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits *Cause:* We are not throttling our requests downstream at all. We should monitor for requests that have failed with ThroughputExceeded exceptions and reduce the throughput accordingly. > Introduce rate limiting mechanism to Async Sink Base > > > Key: FLINK-25793 > URL: https://issues.apache.org/jira/browse/FLINK-25793 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > The Async Sink Base sink is not limiting throughput to the destination and > therefore exceeding rate limits > *Cause:* > We are not throttling our requests downstream at all. > We should monitor for requests that have failed with ThroughputExceeded > exceptions and reduce the throughput accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25793) Introduce rate limiting mechanism to Async Sink Base
[ https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu updated FLINK-25793: --- Summary: Introduce rate limiting mechanism to Async Sink Base (was: Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits) > Introduce rate limiting mechanism to Async Sink Base > > > Key: FLINK-25793 > URL: https://issues.apache.org/jira/browse/FLINK-25793 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Bug: > Kinesis Data Streams sink is not limiting throughput to the destination and > therefore exceeding rate limits > *Cause:* > We are not throttling our requests downstream at all. > We should monitor for requests that have failed with ThroughputExceeded > exceptions and reduce the throughput accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25692) Documentation for Kinesis Firehose Sink
[ https://issues.apache.org/jira/browse/FLINK-25692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zichen Liu closed FLINK-25692. -- Resolution: Fixed > Documentation for Kinesis Firehose Sink > --- > > Key: FLINK-25692 > URL: https://issues.apache.org/jira/browse/FLINK-25692 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Zichen Liu >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > h2. Motivation > _FLINK-24228 introduces a new sink for Kinesis Data Streams that supersedes > the existing one based on KPL._ > *Scope:* > * Deprecate the current section in the docs for the Kinesis KPL sink and > write documentation and usage guide for the new sink. > h2. References > More details to be found > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)