[jira] [Created] (FLINK-24041) [FLIP-171] Generic AsyncSinkBase

2021-08-29 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24041:
--

 Summary: [FLIP-171] Generic AsyncSinkBase
 Key: FLINK-24041
 URL: https://issues.apache.org/jira/browse/FLINK-24041
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24227:
--

 Summary: [FLIP-171] KDS implementation of Async Sink
 Key: FLINK-24227
 URL: https://issues.apache.org/jira/browse/FLINK-24227
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24227:
---
Description: 
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*User stories:*
As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*


 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: 
There will be additional work later to move these implementations somewhere 
else (see the[ ongoing 
discussion)|http://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3ccagznd0ebmgud327_j4gvdyaoygaewxmjz9kzn33fv0v+j8g...@mail.gmail.com%3e].
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] KDS implementation of Async Sink
> ---
>
> Key: FLINK-24227
> URL: https://issues.apache.org/jira/browse/FLINK-24227
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
> pipeline.
> *Scope:*
>  * Implement an asynchronous sink for Kinesis Data Streams (KDS) by 
> inheriting the AsyncSinkBase class. The implementation can for now reside in 
> its own module in flink-connectors. The module and package name can be 
> anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name 
> and {{org.apache.flink.connector.aws.kinesis}} for the package name.
>  * The implementation must use [the Kinesis Java 
> Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
>  * The implementation must allow users to configure the Kinesis Client, with 
> reasonable default setting

[jira] [Updated] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24227:
---
Description: 
h2. Motivation

*User stories:*
As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*


 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: 
There will be additional work later to move these implementations somewhere 
else (see the[ ongoing 
discussion)|http://mail-archives.apache.org/mod_mbox/flink-dev/202106.mbox/%3ccagznd0ebmgud327_j4gvdyaoygaewxmjz9kzn33fv0v+j8g...@mail.gmail.com%3e].
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink


> [FLIP-171] KDS implementation of Async Sink
> ---
>
> Key: FLINK-24227
> URL: https://issues.apache.org/jira/browse/FLINK-24227
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
> pipeline.
> *Scope:*
>  * Implement an asynchronous sink for Kinesis Data Streams (KDS) by 
> inheriting the AsyncSinkBase class. The implementation can for now reside in 
> its own module in flink-connectors. The module and package name can be 
> anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name 
> and {{org.apache.flink.connector.aws.kinesis}} for the package name. 
> Side-note: There will be additional work later to move these implementations 
> somewhere else (see the[

[jira] [Created] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24228:
--

 Summary: [FLIP-171] Firehose implementation of Async Sink
 Key: FLINK-24228
 URL: https://issues.apache.org/jira/browse/FLINK-24228
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24228:
---
Description: 
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] Firehose implementation of Async Sink
> 
>
> Key: FLINK-24228
> URL: https://issues.apache.org/jira/browse/FLINK-24228
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use Kinesis Firehose as sink for my data 
> pipeline.
> *Scope:*
>  * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors. The module and package name can be anything reasonable 
> e.g. {{flink-connector-aws-kinesis}} for the module name and 
> {{org.apache.flink.connector.aws.kinesis}} for the package name.
>  * The implementation must use [the Kinesis Java 
> Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
>  * The implementation must allow users to configure the Kinesis Client, with 
> reasonable default settings.
>  * Implement an asynchornous sink writer for Firehose by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requ

[jira] [Created] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24229:
--

 Summary: [FLIP-171] DynamoDB implementation of Async Sink
 Key: FLINK-24229
 URL: https://issues.apache.org/jira/browse/FLINK-24229
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24229:
---
Description: 
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase 
class. The implementation can for now reside in its own module in 
flink-connectors.
 * Implement an asynchornous sink writer for DynamoDB by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24234:
--

 Summary: [FLIP-171] Byte Based & Time Based Flushing for 
AsyncSinkBase
 Key: FLINK-24234
 URL: https://issues.apache.org/jira/browse/FLINK-24234
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24234:
---
Description: 
*User stories*

 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

{{*Scope*
}}
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink


> [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
> -
>
> Key: FLINK-24234
> URL: https://issues.apache.org/jira/browse/FLINK-24234
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> *User stories*
>  * As a Sink user, I’d like to configure the batch size for items to send to 
> the destination at once (e.g. “flush if there are x number of items in the 
> batch”)
>  * As a Sink user, I’d like to configure the batching logic so that I can 
> flush the batch of requests based on time period (e.g. “flush every 2 
> seconds”)
>  * As a Sink user I’d like to specify the number of bytes for the batch of 
> requests to be flushed (e.g. ”submit the batch after the total number of 
> bytes in it is above 1KB”)
>  * As a Sink developer, I’d like to use the configuration mechanism provided 
> to allow Sink users to configure my Sink implementation
>  * 
> {{*Scope*
> }}
>  * Allow Sink developers and users to pass batch size config to the 
> AsyncSinkWriter
>  * Add support for time-based flushing (e.g. “flush after x miliseconds”) 
> using the ProcessingTimeService which is part of the Sink interface
>  * Add support for byte-based flushing
>  * Consider the combination of time-based flushing and byte-based flushing, 
> if there are more bytes than configured in the time-based batch, then the 
> last few (however m

[jira] [Updated] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase

2021-09-09 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24234:
---
Description: 
*User stories*
 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

*Scope*
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
*User stories*

 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

{{*Scope*
}}
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
> -
>
> Key: FLINK-24234
> URL: https://issues.apache.org/jira/browse/FLINK-24234
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> *User stories*
>  * As a Sink user, I’d like to configure the batch size for items to send to 
> the destination at once (e.g. “flush if there are x number of items in the 
> batch”)
>  * As a Sink user, I’d like to configure the batching logic so that I can 
> flush the batch of requests based on time period (e.g. “flush every 2 
> seconds”)
>  * As a Sink user I’d like to specify the number of bytes for the batch of 
> requests to be flushed (e.g. ”submit the batch after the total number of 
> bytes in it is above 1KB”)
>  * As a Sink developer, I’d like to use the configuration mechanism provided 
> to allow Sink users to configure my Sink implementation
>  * 
> *Scope*
>  * Allow Sink developers and users to pass batch size config to the 
> AsyncSinkWriter
>  * Add support for time-based flushing (e.g. “flush after x miliseconds”) 
> using the ProcessingTimeService which is part of the Sink interface
>  * Add support for byte-based flushing
>  * Consider the combination of time-based flushing and byte-based flushing, 
> if there are more bytes than configured in the time-based batch, then the 
> last few (however many necessary) items should go in the next batch to 
> satisfy the requirement for the number of bytes.
> *References*
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation

2021-09-14 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24278:
---
Description: 
As an Async Sink developer, I’d like to have a step by step guide to 
implementing new Async Sinks

*Scope:*
 * A mark down file in the async sink package guiding developers through steps 
to create new async sink implementations. We could generate PDFs and HTML pages 
from this file later, to share it in other places if needed.

  was:
*User stories*
 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

*Scope*
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] Async Sink Base Sink Developer Guide for Documentation
> -
>
> Key: FLINK-24278
> URL: https://issues.apache.org/jira/browse/FLINK-24278
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> As an Async Sink developer, I’d like to have a step by step guide to 
> implementing new Async Sinks
> *Scope:*
>  * A mark down file in the async sink package guiding developers through 
> steps to create new async sink implementations. We could generate PDFs and 
> HTML pages from this file later, to share it in other places if needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation

2021-09-14 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24278:
--

 Summary: [FLIP-171] Async Sink Base Sink Developer Guide for 
Documentation
 Key: FLINK-24278
 URL: https://issues.apache.org/jira/browse/FLINK-24278
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


*User stories*
 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

*Scope*
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2021-09-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24370:
--

 Summary: [FLIP-171] Documentation for Generic AsyncSinkBase
 Key: FLINK-24370
 URL: https://issues.apache.org/jira/browse/FLINK-24370
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2021-09-24 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24370:
---
Description: 
h2. Motivation

To write documentation for FLIP-171 Async Sink Base. This will help sink 
implementers get acquainted with the necessary information to write their 
concrete sinks.
h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink


> [FLIP-171] Documentation for Generic AsyncSinkBase
> --
>
> Key: FLINK-24370
> URL: https://issues.apache.org/jira/browse/FLINK-24370
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> To write documentation for FLIP-171 Async Sink Base. This will help sink 
> implementers get acquainted with the necessary information to write their 
> concrete sinks.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-04 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17438824#comment-17438824
 ] 

Zichen Liu commented on FLINK-24229:


Hey [~Gusev]

That's great, thanks for letting me know. Hope you had a good vacation!! 

I'm going to try to get changes for the base sink though by tomorrow, so as to 
not have the API move by the time you start work. 
[https://github.com/apache/flink/pull/17687]  I'm close to getting it in as the 
reviews for this change have mostly taken place on a separate PR page.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-05 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439185#comment-17439185
 ] 

Zichen Liu commented on FLINK-24229:


Hi [~Gusev] 

Sadly it may be possible that I won't be able to get the PR merged today... 
sorry about that. If you need a reference for what the changes will look like 
(mostly minor), please use the file changes in the PR as a guide.

 

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-10 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441623#comment-17441623
 ] 

Zichen Liu commented on FLINK-24229:


Hi [~Gusev] 

Sorry for the delay, everything related to the base sink has been merged.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-12 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442731#comment-17442731
 ] 

Zichen Liu commented on FLINK-24229:


Hey [~nir.tsruya] 

Glad to hear!

I guess users can currently customize fatal exception behaviour in the async 
callback of `submitRequestEntries`, but this improvement would refactor out 
this handling into a separate method. I guess this is the main advantage you're 
interested in.

Also, it would give the user the possibility of handling these in the mailbox 
thread (though I believe the behaviour would end up being the same).

 

 

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24904:
--

 Summary: Add documentation for KDS Async Sink
 Key: FLINK-24904
 URL: https://issues.apache.org/jira/browse/FLINK-24904
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24904:
---
Description: 
h2. Motivation

*Scope:*
 * With FLINK-

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> Add documentation for KDS Async Sink
> 
>
> Key: FLINK-24904
> URL: https://issues.apache.org/jira/browse/FLINK-24904
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *Scope:*
>  * With FLINK-
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24904:
---
Description: 
h2. Motivation

*Scope:*
 * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes 
the existing one based on KPL.
 * Deprecate the current section in the docs for the Kinesis KPL sink and write 
documentation and usage guide for the new sink.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*Scope:*
 * With FLINK-

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> Add documentation for KDS Async Sink
> 
>
> Key: FLINK-24904
> URL: https://issues.apache.org/jira/browse/FLINK-24904
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *Scope:*
>  * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes 
> the existing one based on KPL.
>  * Deprecate the current section in the docs for the Kinesis KPL sink and 
> write documentation and usage guide for the new sink.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24904:
---
Description: 
h2. Motivation

_FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the 
existing one based on KPL._

*Scope:*
 * Deprecate the current section in the docs for the Kinesis KPL sink and write 
documentation and usage guide for the new sink.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*Scope:*
 * FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes 
the existing one based on KPL.
 * Deprecate the current section in the docs for the Kinesis KPL sink and write 
documentation and usage guide for the new sink.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> Add documentation for KDS Async Sink
> 
>
> Key: FLINK-24904
> URL: https://issues.apache.org/jira/browse/FLINK-24904
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes 
> the existing one based on KPL._
> *Scope:*
>  * Deprecate the current section in the docs for the Kinesis KPL sink and 
> write documentation and usage guide for the new sink.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24904:
---
Component/s: Connectors / Kinesis
 (was: Connectors / Common)

> Add documentation for KDS Async Sink
> 
>
> Key: FLINK-24904
> URL: https://issues.apache.org/jira/browse/FLINK-24904
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes 
> the existing one based on KPL._
> *Scope:*
>  * Deprecate the current section in the docs for the Kinesis KPL sink and 
> write documentation and usage guide for the new sink.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24905) KDS implementation of Async Sink Table API

2021-11-15 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24905:
--

 Summary: KDS implementation of Async Sink Table API
 Key: FLINK-24905
 URL: https://issues.apache.org/jira/browse/FLINK-24905
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24905) KDS implementation of Async Sink Table API

2021-11-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24905:
---
Description: 
h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name. Side-note: 
There will be additional work later to move these implementations somewhere 
else.
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{{}KinesisTableApiITCase{}}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> KDS implementation of Async Sink Table API
> --
>
> Key: FLINK-24905
> URL: https://issues.apache.org/jira/browse/FLINK-24905
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use the Table API for the new Kinesis Data 
> Streams  sink.
> *Scope:*
>  * Implement an asynchronous sink for Kinesis Data Streams (KDS) by 
> inheriting the AsyncSinkBase class. The implementation can for now reside in 
> its own module in flink-connectors. The module and package name can be 
> anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name 
> and {{org.apache.flink.connector.aws.kinesis}} for the package name. 
> Side-note: There will be additional work later to move these implementations 
> somewhere else.
>  * The implementation must allow users to configure the Kinesis Client, with 
> reasonable default settings.
>  * Implement an asynchornous sink writer for KDS by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to KDS for increased throughput. The implemented Sink Writer will be 
> used by the Sink class th

[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-24 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448591#comment-17448591
 ] 

Zichen Liu commented on FLINK-24229:


hey [~Gusev] 

Hope you're well. Was wondering how the DynamoDB sink was going and if you 
wanted to discuss anything about it?

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25944:
---
Priority: Critical  (was: Major)

> Intermittent Failures on KDF AZP
> 
>
> Key: FLINK-25944
> URL: https://issues.apache.org/jira/browse/FLINK-25944
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
> issue is not reproducible locally. Furthermore the failure is intermittent, 
> frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
> protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25924) KDF Integration tests intermittently fails

2022-02-07 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488002#comment-17488002
 ] 

Zichen Liu commented on FLINK-25924:


Superseded by FLINK-25944

> KDF Integration tests intermittently fails
> --
>
> Key: FLINK-25924
> URL: https://issues.apache.org/jira/browse/FLINK-25924
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0
>
>
> Intermittent failures introduced as part of merge (PR#18314: 
> [FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis 
> Firehose|https://github.com/apache/flink/pull/18314]):
>  # Failures are intermittent and affecting c. 1 in 7 of builds- on 
> {{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} .
>  # The issue looks identical to the KinesaliteContainer startup issue 
> (Appendix 1).
>  # I have managed to reproduce the issue locally - if I start some parallel 
> containers and keep them running - and then run {{KinesisFirehoseSinkITCase}} 
>  then c. 1 in 6 gives the error.
>  # The errors have a slightly different appearance on 
> {{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same 
> appearance as local. I only hope it is a difference in logging/killing 
> environment variables. (and that there aren’t 2 distinct issues)
> Appendix 1:
> {code:java}
> org.testcontainers.containers.ContainerLaunchException: Container startup 
> failed
> at 
> org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
> at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
> at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
> at 
> ... 11 more
> Caused by: org.testcontainers.containers.ContainerLaunchException: Could not 
> create/start container
> at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525)
> at 
> org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331)
> at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
> ... 12 more
> Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result 
> with exception
> at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
> at
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25924) KDF Integration tests intermittently fails

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25924.
--
Release Note: Superseded by FLINK-25944
  Resolution: Duplicate

> KDF Integration tests intermittently fails
> --
>
> Key: FLINK-25924
> URL: https://issues.apache.org/jira/browse/FLINK-25924
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0
>
>
> Intermittent failures introduced as part of merge (PR#18314: 
> [FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis 
> Firehose|https://github.com/apache/flink/pull/18314]):
>  # Failures are intermittent and affecting c. 1 in 7 of builds- on 
> {{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} .
>  # The issue looks identical to the KinesaliteContainer startup issue 
> (Appendix 1).
>  # I have managed to reproduce the issue locally - if I start some parallel 
> containers and keep them running - and then run {{KinesisFirehoseSinkITCase}} 
>  then c. 1 in 6 gives the error.
>  # The errors have a slightly different appearance on 
> {{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same 
> appearance as local. I only hope it is a difference in logging/killing 
> environment variables. (and that there aren’t 2 distinct issues)
> Appendix 1:
> {code:java}
> org.testcontainers.containers.ContainerLaunchException: Container startup 
> failed
> at 
> org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
> at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
> at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
> at 
> ... 11 more
> Caused by: org.testcontainers.containers.ContainerLaunchException: Could not 
> create/start container
> at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525)
> at 
> org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331)
> at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
> ... 12 more
> Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result 
> with exception
> at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
> at
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-07 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488003#comment-17488003
 ] 

Zichen Liu commented on FLINK-25944:


Fix is coming out with https://github.com/apache/flink/pull/18553

> Intermittent Failures on KDF AZP
> 
>
> Key: FLINK-25944
> URL: https://issues.apache.org/jira/browse/FLINK-25944
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
> issue is not reproducible locally. Furthermore the failure is intermittent, 
> frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
> protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25976:
--

 Summary: Update the KDS and KDF Sink's defaults & update the docs
 Key: FLINK-25976
 URL: https://issues.apache.org/jira/browse/FLINK-25976
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Priority: Minor  (was: Major)

> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Description: 
h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

to match with the default threads in the AWS SDK v2 HTTP Client default.

  was:
h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.


> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Issue Type: Improvement  (was: Bug)

> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25977:
--

 Summary: Close sink client and sink http client for KDS/KDF Sinks
 Key: FLINK-25977
 URL: https://issues.apache.org/jira/browse/FLINK-25977
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Description: 
h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

 

to match with the default threads in the AWS SDK v2 HTTP Client default.

  was:
h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

to match with the default threads in the AWS SDK v2 HTTP Client default.


> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Minor
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
>  
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25945) Intermittent Failures on KDF AZP 2

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25945:
---
Priority: Blocker  (was: Major)

> Intermittent Failures on KDF AZP 2
> --
>
> Key: FLINK-25945
> URL: https://issues.apache.org/jira/browse/FLINK-25945
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [PR#18606|https://github.com/apache/flink/pull/18606] - 
> unrelated to (1), this issue reads Unable to execute HTTP request: Trying to 
> access closed classloader and seems unrelated to the code change (effectively 
> a 1 line Thread.sleep ). This means it is an intermittent issue affecting the 
> KDF feature more widely, and has not yet been picked up by the 20+ runs on 
> the ci. Not reproducible locally. Intermittently failing on ci, frequency 
> unknown, likely more than 20 : 1. Resolution: unknown.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25944:
---
Priority: Blocker  (was: Critical)

> Intermittent Failures on KDF AZP
> 
>
> Key: FLINK-25944
> URL: https://issues.apache.org/jira/browse/FLINK-25944
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
> issue is not reproducible locally. Furthermore the failure is intermittent, 
> frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
> protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25945) Intermittent Failures on KDF AZP 2

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25945:
---
Issue Type: Bug  (was: New Feature)

> Intermittent Failures on KDF AZP 2
> --
>
> Key: FLINK-25945
> URL: https://issues.apache.org/jira/browse/FLINK-25945
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [PR#18606|https://github.com/apache/flink/pull/18606] - 
> unrelated to (1), this issue reads Unable to execute HTTP request: Trying to 
> access closed classloader and seems unrelated to the code change (effectively 
> a 1 line Thread.sleep ). This means it is an intermittent issue affecting the 
> KDF feature more widely, and has not yet been picked up by the 20+ runs on 
> the ci. Not reproducible locally. Intermittently failing on ci, frequency 
> unknown, likely more than 20 : 1. Resolution: unknown.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Priority: Major  (was: Minor)

> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
>  
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25976:
---
Issue Type: Bug  (was: Improvement)

> Update the KDS and KDF Sink's defaults & update the docs
> 
>
> Key: FLINK-25976
> URL: https://issues.apache.org/jira/browse/FLINK-25976
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: pull-request-available
>
> h2. Update:
> DEFAULT_MAX_IN_FLIGHT_REQUESTS=50
>  
> to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks

2022-02-08 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25977.
--
Release Note: Superseded by FLINK-26006
  Resolution: Duplicate

> Close sink client and sink http client for KDS/KDF Sinks
> 
>
> Key: FLINK-25977
> URL: https://issues.apache.org/jira/browse/FLINK-25977
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Critical
> Fix For: 1.15.0
>
>
> We are not closing the AWS SDK and HTTP clients for the new KDS/KDF async 
> sink. The close method needs to be invoked when operator stops.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26008) [FLIP-171] Update Kinesalite docker container reference

2022-02-08 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488970#comment-17488970
 ] 

Zichen Liu commented on FLINK-26008:


I'm afraid it seems latest is the only tag for this image... 
[https://hub.docker.com/r/instructure/kinesalite/tags]

Other popular Kinesalite images also have only 1 tag.

Any suggestions?

> [FLIP-171] Update Kinesalite docker container reference 
> 
>
> Key: FLINK-26008
> URL: https://issues.apache.org/jira/browse/FLINK-26008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Danny Cranmer
>Assignee: Zichen Liu
>Priority: Major
> Fix For: 1.15.0
>
>
> We are currently referencing {{:latest}} Kinesalite image for tests. Update 
> this to the more recent version to avoid updates failing tests:
> - https://github.com/apache/flink/pull/18661/files#r801531468



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2022-02-14 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-24227.
--
Resolution: Fixed

Feature implemented, test coverage done and docs written. Only bugfixes 
remaining as of 2022-02-14.

> [FLIP-171] KDS implementation of Async Sink
> ---
>
> Key: FLINK-24227
> URL: https://issues.apache.org/jira/browse/FLINK-24227
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
> pipeline.
> *Scope:*
>  * Implement an asynchronous sink for Kinesis Data Streams (KDS) by 
> inheriting the AsyncSinkBase class. The implementation can for now reside in 
> its own module in flink-connectors. The module and package name can be 
> anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name 
> and {{org.apache.flink.connector.aws.kinesis}} for the package name.
>  * The implementation must use [the Kinesis Java 
> Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
>  * The implementation must allow users to configure the Kinesis Client, with 
> reasonable default settings.
>  * Implement an asynchornous sink writer for KDS by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to KDS for increased throughput. The implemented Sink Writer will be 
> used by the Sink class that will be created as part of this story.
>  * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). 
> We already use this in {{KinesisTableApiITCase}}.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25944.
--
Resolution: Duplicate

Appearance of this bug is caused by not closing AWS resources properly in 
FLINK-25846

> Intermittent Failures on KDF AZP
> 
>
> Key: FLINK-25944
> URL: https://issues.apache.org/jira/browse/FLINK-25944
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
> issue is not reproducible locally. Furthermore the failure is intermittent, 
> frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
> protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2022-02-15 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24370:
---
Fix Version/s: (was: 1.15.0)

> [FLIP-171] Documentation for Generic AsyncSinkBase
> --
>
> Key: FLINK-24370
> URL: https://issues.apache.org/jira/browse/FLINK-24370
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Motivation
> To write documentation for FLIP-171 Async Sink Base. This will help sink 
> implementers get acquainted with the necessary information to write their 
> concrete sinks.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Priority: Minor  (was: Major)

> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> 
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data 
> Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates 
> a shared thread pool for Netty to use for network operations when one is not 
> configured. The thread pool is managed by a shared ELG (event loop group), 
> and this is stored in a static field. We do not configure this for the AWS 
> connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader 
> from the current thread. If the ELG is created from a shared classloader, for 
> instance Flink parent classloader, or MiniCluster parent classloader, 
> multiple Flink jobs can share the same ELG. When an ELG thread is spawned 
> from a Flink job, it will inherit the Flink user classloader. When this job 
> completes/fails, the classloader is destroyed, however the Netty thread is 
> still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded 
> via the Flink User Classloader. It is expected this is the standard 
> deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed 
> https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
> closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNe

[jira] [Created] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28007:
--

 Summary: Tests for AWS Connectors Using SDK v2 to use Synchronous 
Clients
 Key: FLINK-28007
 URL: https://issues.apache.org/jira/browse/FLINK-28007
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.16.0


h3. Background

AWS SDK v2 async clients use a Netty async client for Kinesis Data 
Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a 
shared thread pool for Netty to use for network operations when one is not 
configured. The thread pool is managed by a shared ELG (event loop group), and 
this is stored in a static field. We do not configure this for the AWS 
connectors in the Flink codebase. 

When threads are spawned within the ELG, they inherit the context classloader 
from the current thread. If the ELG is created from a shared classloader, for 
instance Flink parent classloader, or MiniCluster parent classloader, multiple 
Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink 
job, it will inherit the Flink user classloader. When this job completes/fails, 
the classloader is destroyed, however the Netty thread is still referencing it, 
and this leads to below exception.

h3. Impact

This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via 
the Flink User Classloader. It is expected this is the standard deployment 
configuration.

This issue is known to impact:
- Flink mini cluster, for example in integration tests (FLINK-26064)
- Flink cluster loading AWS SDK v2 via parent classloader

h3. Suggested solution

There are a few possible solutions, as discussed 
https://github.com/apache/flink/pull/18733
1. Create a separate ELG per Flink job
2. Create a separate ELG per subtask
3. Attach the correct classloader to ELG spawned threads

h3. Error Stack

(shortened stack trace, as full is too large)
{noformat}
Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
(...)
Feb 09 20:05:04 Caused by: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
(...)
Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
Feb 09 20:05:04 at 
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
Feb 09 20:05:04 at 
java.util.ServiceLoader$La

[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Affects Version/s: 1.15.0
   1.15.1
   1.15.2

> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> 
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.0, 1.15.1, 1.15.2
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data 
> Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates 
> a shared thread pool for Netty to use for network operations when one is not 
> configured. The thread pool is managed by a shared ELG (event loop group), 
> and this is stored in a static field. We do not configure this for the AWS 
> connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader 
> from the current thread. If the ELG is created from a shared classloader, for 
> instance Flink parent classloader, or MiniCluster parent classloader, 
> multiple Flink jobs can share the same ELG. When an ELG thread is spawned 
> from a Flink job, it will inherit the Flink user classloader. When this job 
> completes/fails, the classloader is destroyed, however the Netty thread is 
> still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded 
> via the Flink User Classloader. It is expected this is the standard 
> deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed 
> https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
> closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 2

[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Component/s: Connectors / Common

> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> 
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Connectors / Kinesis
>Affects Versions: 1.15.0, 1.15.1, 1.15.2
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data 
> Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates 
> a shared thread pool for Netty to use for network operations when one is not 
> configured. The thread pool is managed by a shared ELG (event loop group), 
> and this is stored in a static field. We do not configure this for the AWS 
> connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader 
> from the current thread. If the ELG is created from a shared classloader, for 
> instance Flink parent classloader, or MiniCluster parent classloader, 
> multiple Flink jobs can share the same ELG. When an ELG thread is spawned 
> from a Flink job, it will inherit the Flink user classloader. When this job 
> completes/fails, the classloader is destroyed, however the Netty thread is 
> still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded 
> via the Flink User Classloader. It is expected this is the standard 
> deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed 
> https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
> closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> org.apache.f

[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Labels:   (was: pull-request-available)

> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> 
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Connectors / Kinesis
>Affects Versions: 1.15.0, 1.15.1, 1.15.2
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Minor
> Fix For: 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data 
> Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates 
> a shared thread pool for Netty to use for network operations when one is not 
> configured. The thread pool is managed by a shared ELG (event loop group), 
> and this is stored in a static field. We do not configure this for the AWS 
> connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader 
> from the current thread. If the ELG is created from a shared classloader, for 
> instance Flink parent classloader, or MiniCluster parent classloader, 
> multiple Flink jobs can share the same ELG. When an ELG thread is spawned 
> from a Flink job, it will inherit the Flink user classloader. When this job 
> completes/fails, the classloader is destroyed, however the Netty thread is 
> still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded 
> via the Flink User Classloader. It is expected this is the standard 
> deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed 
> https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: 
> software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
> HTTP request: Trying to access closed classloader. Please check if you store 
> classloaders directly or indirectly in static fields. If the stacktrace 
> suggests that the leak occurs in a third party library and cannot be fixed 
> immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
> closed classloader. Please check if you store classloaders directly or 
> indirectly in static fields. If the stacktrace suggests that the leak occurs 
> in a third party library and cannot be fixed immediately, you can disable 
> this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04   at 
> org.apache.flink.runtime.execution.librarycache.Fli

[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Description: 
h3. Background

The unit & integration tests for the aws connectors in the Flink repository 
create clients using static helper methods in flink-connector-aws-base, in the 
AWSServicesTestUtils class.

These static helper methods create the asynchronous flavour of the clients 
required by aws connectors.

*Task*

* Change these to the synchronous version for each aws client.

  was:
h3. Background

AWS SDK v2 async clients use a Netty async client for Kinesis Data 
Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a 
shared thread pool for Netty to use for network operations when one is not 
configured. The thread pool is managed by a shared ELG (event loop group), and 
this is stored in a static field. We do not configure this for the AWS 
connectors in the Flink codebase. 

When threads are spawned within the ELG, they inherit the context classloader 
from the current thread. If the ELG is created from a shared classloader, for 
instance Flink parent classloader, or MiniCluster parent classloader, multiple 
Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink 
job, it will inherit the Flink user classloader. When this job completes/fails, 
the classloader is destroyed, however the Netty thread is still referencing it, 
and this leads to below exception.

h3. Impact

This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via 
the Flink User Classloader. It is expected this is the standard deployment 
configuration.

This issue is known to impact:
- Flink mini cluster, for example in integration tests (FLINK-26064)
- Flink cluster loading AWS SDK v2 via parent classloader

h3. Suggested solution

There are a few possible solutions, as discussed 
https://github.com/apache/flink/pull/18733
1. Create a separate ELG per Flink job
2. Create a separate ELG per subtask
3. Attach the correct classloader to ELG spawned threads

h3. Error Stack

(shortened stack trace, as full is too large)
{noformat}
Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
(...)
Feb 09 20:05:04 Caused by: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
(...)
Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
Feb 09 20:05:04 

[jira] [Updated] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-11 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
---
Labels: pull-request-available  (was: )

> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> 
>
> Key: FLINK-28007
> URL: https://issues.apache.org/jira/browse/FLINK-28007
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Connectors / Kinesis
>Affects Versions: 1.15.0, 1.15.1, 1.15.2
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h3. Background
> The unit & integration tests for the aws connectors in the Flink repository 
> create clients using static helper methods in flink-connector-aws-base, in 
> the AWSServicesTestUtils class.
> These static helper methods create the asynchronous flavour of the clients 
> required by aws connectors.
> *Task*
> * Change these to the synchronous version for each aws client.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy

2022-06-13 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28027:
--

 Summary: Initialise Async Sink maximum number of in flight 
messages to low number for rate limiting strategy
 Key: FLINK-28027
 URL: https://issues.apache.org/jira/browse/FLINK-28027
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common, Connectors / Kinesis
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.16.0


*Background*

In the AsyncSinkWriter, we implement a rate limiting strategy.

The initial value for the maximum number of in flight messages is set extremely 
high ({{{}maxBatchSize * maxInFlightRequests{}}}).

However, in accordance with the AIMD strategy, the TCP implementation for 
congestion control has found a small value to start with [is 
better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]).

*Suggestion*

A better default might be:
 * maxBatchSize
 * maxBatchSize / parallelism



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-27670.
--
Resolution: Duplicate

Completed in FLINK-21966

> Python wrappers for Kinesis Sinks
> -
>
> Key: FLINK-27670
> URL: https://issues.apache.org/jira/browse/FLINK-27670
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Connectors / Kinesis
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.16.0
>
>
> Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
> Firehose sink.
> An example implementation may be found here 
> [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28148:
--

 Summary: Unable to load jar connector to a Python Table API app
 Key: FLINK-28148
 URL: https://issues.apache.org/jira/browse/FLINK-28148
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Connectors / Common, Table SQL / API
Affects Versions: 1.16.0
Reporter: Zichen Liu


Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/)
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

  was:
Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/)
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink

[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

  was:
Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File 

[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Labels: connector jar python table-api  (was: )

> Unable to load jar connector to a Python Table API app
> --
>
> Key: FLINK-28148
> URL: https://issues.apache.org/jira/browse/FLINK-28148
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Common, Table SQL / API
>Affects Versions: 1.16.0
>Reporter: Zichen Liu
>Priority: Major
>  Labels: connector, jar, python, table-api
>
> Reproduction steps:
>  # Clone the latest Flink from the master branch.
>  # Follow the Flink [recommended 
> steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
>  to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
> Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
>  # Create a new Python Table API app that loads in a jar, similar to:
>  
> {code:java}
> from pyflink.table import TableEnvironment, StreamTableEnvironment, 
> EnvironmentSettings
> env_settings = EnvironmentSettings.in_streaming_mode()
> t_env = StreamTableEnvironment.create(environment_settings=env_settings)
> t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
> {code}
> The jar loaded here can be any jar, and the following message will appear:
>  
> {code:java}
> Traceback (most recent call last):
>   File "pyflink_table_api_firehose.py", line 48, in 
> log_processing()
>   File "pyflink_table_api_firehose.py", line 14, in log_processing
> t_env.get_config().set("pipeline.classpaths", 
> "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
>  line 109, in set
> add_jars_to_context_class_loader(value.split(";"))
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
>  line 169, in add_jars_to_context_class_loader
> addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", 
> line 1322, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 146, in deco
> return f(*a, **kw)
>   File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
> : java.lang.IllegalArgumentException: object is not an instance of declaring 
> class
>at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>at java.base/java.lang.Thread.run(Thread.java:829) {code}
> Reproduced on Mac and Amazon Linux 2.
> Next do:
> {code:java}
> pip uninstall apache-flink
> pip install apache-flink{code}
> To downgrade it to 1.15 release.
> The loading of the jar should be successful. Even if you try to load the same 
> connector built from master (reproduced with Kafka, Kinesis Firehose).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

  was:
Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and 

[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The following alternative ways of loading jars produce a similar issue:
{code:java}
table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///path/to/your/jar.jar") {code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

  was:
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{co

[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The following alternative way of loading jars produce a similar issue:
{code:java}
table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///path/to/your/jar.jar") {code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

  was:
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{cod

[jira] [Updated] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28148:
---
Description: 
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 * Clone the latest Flink from the master branch.
 * Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 * Create a new Python Table API app that loads in a jar, similar to:

{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
 
 * The following alternative way of loading jars produce a similar issue:

{code:java}
table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///path/to/your/jar.jar") {code}
 
 * The jar loaded here can be any jar, and the following message will appear:

{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
 
 * Next do:

{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
...to downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).

Reproduced on Mac and Amazon Linux 2.

  was:
h2. Background

User currently unable to build & install the latest PyFlink and then load jars. 
The jar loading mechanism was introduced in FLINK-16943.
h2. Reproduction steps
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, sim

[jira] [Commented] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-27 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559081#comment-17559081
 ] 

Zichen Liu commented on FLINK-28148:


[~dianfu] [~hxbks2ks] Apologies for the late reply, I was off for a few days. 

Thank you making the fix! This is a duplicate of FLINK-28002.

> Unable to load jar connector to a Python Table API app
> --
>
> Key: FLINK-28148
> URL: https://issues.apache.org/jira/browse/FLINK-28148
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Common, Table SQL / API
>Affects Versions: 1.16.0
>Reporter: Zichen Liu
>Priority: Major
>  Labels: connector, jar, python, table-api
>
> h2. Background
> User currently unable to build & install the latest PyFlink and then load 
> jars. The jar loading mechanism was introduced in FLINK-16943.
> h2. Reproduction steps
>  * Clone the latest Flink from the master branch.
>  * Follow the Flink [recommended 
> steps|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/]
>  to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
> Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
>  * Create a new Python Table API app that loads in a jar, similar to:
> {code:java}
> from pyflink.table import TableEnvironment, StreamTableEnvironment, 
> EnvironmentSettings
> env_settings = EnvironmentSettings.in_streaming_mode()
> t_env = StreamTableEnvironment.create(environment_settings=env_settings)
> t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
> {code}
>  
>  * The following alternative way of loading jars produce a similar issue:
> {code:java}
> table_env.get_config().get_configuration().set_string("pipeline.jars", 
> "file:///path/to/your/jar.jar") {code}
>  
>  * The jar loaded here can be any jar, and the following message will appear:
> {code:java}
> Traceback (most recent call last):
>   File "pyflink_table_api_firehose.py", line 48, in 
> log_processing()
>   File "pyflink_table_api_firehose.py", line 14, in log_processing
> t_env.get_config().set("pipeline.classpaths", 
> "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
>  line 109, in set
> add_jars_to_context_class_loader(value.split(";"))
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
>  line 169, in add_jars_to_context_class_loader
> addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", 
> line 1322, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 146, in deco
> return f(*a, **kw)
>   File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
> : java.lang.IllegalArgumentException: object is not an instance of declaring 
> class
>at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
>  * Next do:

[jira] [Commented] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink

2022-06-27 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559107#comment-17559107
 ] 

Zichen Liu commented on FLINK-25735:


Hi Sorry for the late response, I have been off! I do not have permission to 
assign to someone, but I will find someone to assign it to you. Thank you for 
offering!

> Chinese Translation - Add documentation for KDS Async Sink
> --
>
> Key: FLINK-25735
> URL: https://issues.apache.org/jira/browse/FLINK-25735
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
>  Labels: chinese-translation
>
> h2. Translate:
>  * Connectors/Kinesis page to deprecate old sink 
> (docs/content/docs/connectors/datastream/kinesis.md)
>  * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - 
> only the section about Kinesis
> into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink

2022-06-27 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17559121#comment-17559121
 ] 

Zichen Liu commented on FLINK-25735:


Apologies again for the delay. Would [~martijnvisser] mind perform the 
assignment?

> Chinese Translation - Add documentation for KDS Async Sink
> --
>
> Key: FLINK-25735
> URL: https://issues.apache.org/jira/browse/FLINK-25735
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
>  Labels: chinese-translation
>
> h2. Translate:
>  * Connectors/Kinesis page to deprecate old sink 
> (docs/content/docs/connectors/datastream/kinesis.md)
>  * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - 
> only the section about Kinesis
> into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25859) Add documentation for DynamoDB Async Sink

2022-08-22 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17583106#comment-17583106
 ] 

Zichen Liu commented on FLINK-25859:


Hi [~Gusev], thanks again for contributing the DynamoDB sink, would you like to 
pick this one up, as it goes along with that work?

I believe I am assigned to this task due to cloning of jiras, but, if you are 
happy to pick this up, a committer ( maybe . [~dannycranmer] ) will be happy to 
assign it to you.

 

 

> Add documentation for DynamoDB Async Sink
> -
>
> Key: FLINK-25859
> URL: https://issues.apache.org/jira/browse/FLINK-25859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis, Documentation
>Reporter: Yuri Gusev
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Motivation
> FLINK-24229 _introduces a new sink for DynamoDB_
> *Scope:*
>  * Create documentation for the new connector
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2022-07-20 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966
 ] 

Zichen Liu commented on FLINK-24370:


Done in linked PR#517

> [FLIP-171] Documentation for Generic AsyncSinkBase
> --
>
> Key: FLINK-24370
> URL: https://issues.apache.org/jira/browse/FLINK-24370
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Motivation
> To write documentation for FLIP-171 Async Sink Base. This will help sink 
> implementers get acquainted with the necessary information to write their 
> concrete sinks.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2022-07-20 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966
 ] 

Zichen Liu edited comment on FLINK-24370 at 7/20/22 10:09 AM:
--

Done in linked PR#517

 

Please complete this and FLINK-24278 projects [~chesnay] 


was (Author: crynetlogistics):
Done in linked PR#517

> [FLIP-171] Documentation for Generic AsyncSinkBase
> --
>
> Key: FLINK-24370
> URL: https://issues.apache.org/jira/browse/FLINK-24370
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Motivation
> To write documentation for FLIP-171 Async Sink Base. This will help sink 
> implementers get acquainted with the necessary information to write their 
> concrete sinks.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2022-07-20 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568966#comment-17568966
 ] 

Zichen Liu edited comment on FLINK-24370 at 7/20/22 10:20 AM:
--

Done in linked PR#517

 

Please mark this ticket and FLINK-24278 as Resolved  [~chesnay] 


was (Author: crynetlogistics):
Done in linked PR#517

 

Please complete this and FLINK-24278 projects [~chesnay] 

> [FLIP-171] Documentation for Generic AsyncSinkBase
> --
>
> Key: FLINK-24370
> URL: https://issues.apache.org/jira/browse/FLINK-24370
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> h2. Motivation
> To write documentation for FLIP-171 Async Sink Base. This will help sink 
> implementers get acquainted with the necessary information to write their 
> concrete sinks.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable

2022-07-21 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569349#comment-17569349
 ] 

Zichen Liu commented on FLINK-27537:


Hello [~DavidLiu001] Thank you for offering. I'm delighted you want to make 
this change. [~dannycranmer] would you mind assigning David to this jira?

> Remove requirement for Async Sink's RequestEntryT to be serializable
> 
>
> Key: FLINK-27537
> URL: https://issues.apache.org/jira/browse/FLINK-27537
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
>
> Currently, in AsyncSinkBase and it's dependent classes, e.g. the sink writer, 
> element converter etc., the RequestEntryT generic type is required to be 
> serializable.
> However, this requirement no longer holds and there is nothing that actually 
> requires this.
> Proposed approach:
>  * Remove the extends serializable from the generic type RequestEntryT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27536) Rename method parameter in AsyncSinkWriter

2022-07-21 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569350#comment-17569350
 ] 

Zichen Liu commented on FLINK-27536:


Hi [~DavidLiu001], awesome. [~dannycranmer] would you mind assigning please?

> Rename method parameter in AsyncSinkWriter
> --
>
> Key: FLINK-27536
> URL: https://issues.apache.org/jira/browse/FLINK-27536
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Minor
>
> Change the abstract method's parameter naming in AsyncSinkWriter.
> From
>   Consumer> requestResult
> to
>   Consumer> requestToRetry
> or something similar.
>  
> This is because the consumer here is supposed to accept a list of requests 
> that need to be retried.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError

2022-06-30 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561036#comment-17561036
 ] 

Zichen Liu commented on FLINK-28169:


I have been coordinating with [~chalixar]  in the background and I have found a 
fix and can push a PR, please reassign to me ([~chalixar]  please confirm you 
are happy with this)

> GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
> ---
>
> Key: FLINK-28169
> URL: https://issues.apache.org/jira/browse/FLINK-28169
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> Jun 21 03:06:27 Caused by: 
> org.testcontainers.containers.ContainerLaunchException: Could not 
> create/start container
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
> Jun 21 03:06:27   ... 8 more
> Jun 21 03:06:27 Caused by: java.lang.RuntimeException: 
> java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480)
> Jun 21 03:06:27   ... 10 more
> Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> Jun 21 03:06:27   ... 1 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError

2022-06-30 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561112#comment-17561112
 ] 

Zichen Liu commented on FLINK-28169:


Thanks for assigning it to me [~martijnvisser] , I have pushed the PR and 
passed CI. Thanks [~chalixar] for approving and creating the related follow ups 
 FLINK-28332 and FLINK-28333. 

> GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
> ---
>
> Key: FLINK-28169
> URL: https://issues.apache.org/jira/browse/FLINK-28169
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Zichen Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> Jun 21 03:06:27 Caused by: 
> org.testcontainers.containers.ContainerLaunchException: Could not 
> create/start container
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
> Jun 21 03:06:27   ... 8 more
> Jun 21 03:06:27 Caused by: java.lang.RuntimeException: 
> java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480)
> Jun 21 03:06:27   ... 10 more
> Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> Jun 21 03:06:27   ... 1 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28169) GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError

2022-07-01 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561431#comment-17561431
 ] 

Zichen Liu commented on FLINK-28169:


Thanks [~martijnvisser] much appreciated.

 

> GlueSchemaRegistryJsonKinesisITCase fails on JDK11 due to NoSuchMethodError
> ---
>
> Key: FLINK-28169
> URL: https://issues.apache.org/jira/browse/FLINK-28169
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Martijn Visser
>Assignee: Zichen Liu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> Jun 21 03:06:27 Caused by: 
> org.testcontainers.containers.ContainerLaunchException: Could not 
> create/start container
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:537)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:340)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
> Jun 21 03:06:27   ... 8 more
> Jun 21 03:06:27 Caused by: java.lang.RuntimeException: 
> java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:40)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.retryUntilSuccessRunner(KinesaliteContainer.java:150)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.waitUntilReady(KinesaliteContainer.java:146)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.wait.strategy.AbstractWaitStrategy.waitUntilReady(AbstractWaitStrategy.java:51)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.waitUntilContainerStarted(GenericContainer.java:926)
> Jun 21 03:06:27   at 
> org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:480)
> Jun 21 03:06:27   ... 10 more
> Jun 21 03:06:27 Caused by: java.lang.NoSuchMethodError: 
> 'org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient 
> org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient()'
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.list(KinesaliteContainer.java:157)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
> Jun 21 03:06:27   at 
> org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer$ListStreamsWaitStrategy.lambda$retryUntilSuccessRunner$0(KinesaliteContainer.java:153)
> Jun 21 03:06:27   at 
> org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> Jun 21 03:06:27   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> Jun 21 03:06:27   ... 1 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1&l=16659



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25729:
--

 Summary: Replace the deprecated FlinkKinesisConsumer with a 
Kinesis Consumer based on the AWS SDK for Java 2.x
 Key: FLINK-25729
 URL: https://issues.apache.org/jira/browse/FLINK-25729
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use configure a custom fatal exception handler to 
the AsyncSinkWriter

*Scope:*
 * Create a new fatal exception handler
 * Users of the AsyncSink should be able to create an implementation and pass 
it to the sink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-01-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25729:
---
Description: 
h2. Motivation

*User stories:*
As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for 
Java 2.x rather than the Kinesis Consumer Library.

 
 * Maintain all the features of the current consumer at 
`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the new 
source
 * The new sink should live in the module `flink-connector-aws-kinesis-streams` 
(old: `flink-connector-kinesis`)

  was:
h2. Motivation

*User stories:*
As a Flink user, I’d like to use configure a custom fatal exception handler to 
the AsyncSinkWriter

*Scope:*
 * Create a new fatal exception handler
 * Users of the AsyncSink should be able to create an implementation and pass 
it to the sink


> Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on 
> the AWS SDK for Java 2.x
> -
>
> Key: FLINK-25729
> URL: https://issues.apache.org/jira/browse/FLINK-25729
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for 
> Java 2.x rather than the Kinesis Consumer Library.
>  
>  * Maintain all the features of the current consumer at 
> `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the 
> new source
>  * The new sink should live in the module 
> `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-01-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25729:
---
Component/s: Connectors / Kinesis
 (was: Connectors / Common)

> Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on 
> the AWS SDK for Java 2.x
> -
>
> Key: FLINK-25729
> URL: https://issues.apache.org/jira/browse/FLINK-25729
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for 
> Java 2.x rather than the Kinesis Consumer Library.
>  
>  * Maintain all the features of the current consumer at 
> `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the 
> new source
>  * The new sink should live in the module 
> `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25731:
--

 Summary: Mark FlinkKinesisProducer/FlinkKinesisConsumer as 
deprecated
 Key: FLINK-25731
 URL: https://issues.apache.org/jira/browse/FLINK-25731
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase 
class. The implementation can for now reside in its own module in 
flink-connectors.
 * Implement an asynchornous sink writer for DynamoDB by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated

2022-01-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25731:
---
Description: 
This consumer based on the Kinesis Consumer Library (KCL) has been deprecated, 
a Jira to create an equivalent consumer based on AWS SDK for Java 2.x has been 
raised in FLINK-25729 and planned for Flink 1.16.

This producer based on the Kinesis Producer Library KPL has been superseded. 
The new sink can be found in the module 
flink-connectors/flink-connector-aws-kinesis-data-streams and package 
org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based on 
the AWS SDK for Java 2.x. The work to replace this sink was carried out in 
FLINK-24227.

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase 
class. The implementation can for now reside in its own module in 
flink-connectors.
 * Implement an asynchornous sink writer for DynamoDB by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
> 
>
> Key: FLINK-25731
> URL: https://issues.apache.org/jira/browse/FLINK-25731
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> This consumer based on the Kinesis Consumer Library (KCL) has been 
> deprecated, a Jira to create an equivalent consumer based on AWS SDK for Java 
> 2.x has been raised in FLINK-25729 and planned for Flink 1.16.
> This producer based on the Kinesis Producer Library KPL has been superseded. 
> The new sink can be found in the module 
> flink-connectors/flink-connector-aws-kinesis-data-streams and package 
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based 
> on the AWS SDK for Java 2.x. The work to replace this sink was carried out in 
> FLINK-24227.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25735:
--

 Summary: Chinese Translation - Add documentation for KDS Async Sink
 Key: FLINK-25735
 URL: https://issues.apache.org/jira/browse/FLINK-25735
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content/docs/connectors/datastream/kinesis.md)
 * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only 
the section about Kinesis

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25408) Chinese Translation - Add documentation for KDS Async Sink

2022-01-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25408.
--
Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/FLINK-25735

> Chinese Translation - Add documentation for KDS Async Sink
> --
>
> Key: FLINK-25408
> URL: https://issues.apache.org/jira/browse/FLINK-25408
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis, Documentation
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
> Fix For: 1.15.0
>
>
> h2. Translate:
>  * Connectors/Kinesis page to deprecate old sink 
> (docs/content/docs/connectors/datastream/kinesis.md)
>  * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md)
> into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25736) Chinese Translation - Update documentation for Kinesis Table Api

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25736:
--

 Summary: Chinese Translation - Update documentation for Kinesis 
Table Api
 Key: FLINK-25736
 URL: https://issues.apache.org/jira/browse/FLINK-25736
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content.zh/docs/connectors/table/kinesis.md)

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25737:
--

 Summary: Chinese Translation - Add documentation for Firehose 
Async Sink
 Key: FLINK-25737
 URL: https://issues.apache.org/jira/browse/FLINK-25737
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content/docs/connectors/datastream/kinesis.md)
 * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only 
the section about Kinesis

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink

2022-01-20 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25737:
---
Description: 
h2. Translate:
 * Connectors/Firehose page  
(docs/content.zh/docs/connectors/datastream/firehose.md)

into Chinese.

  was:
h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content/docs/connectors/datastream/kinesis.md)
 * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only 
the section about Kinesis

into Chinese.


> Chinese Translation - Add documentation for Firehose Async Sink
> ---
>
> Key: FLINK-25737
> URL: https://issues.apache.org/jira/browse/FLINK-25737
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.15.0
>
>
> h2. Translate:
>  * Connectors/Firehose page  
> (docs/content.zh/docs/connectors/datastream/firehose.md)
> into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty

2022-01-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25792:
--

 Summary: Async Sink Base is too being flushed too frequently 
resulting in backpressure even when buffer is near empty
 Key: FLINK-25792
 URL: https://issues.apache.org/jira/browse/FLINK-25792
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty

2022-01-24 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25792:
---
Description: 
h2. Bug:

 

  was:
h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> Async Sink Base is too being flushed too frequently resulting in backpressure 
> even when buffer is near empty
> 
>
> Key: FLINK-25792
> URL: https://issues.apache.org/jira/browse/FLINK-25792
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty

2022-01-24 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25792:
---
Description: 
h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.

  was:
h2. Bug:

 


> Async Sink Base is too being flushed too frequently resulting in backpressure 
> even when buffer is near empty
> 
>
> Key: FLINK-25792
> URL: https://issues.apache.org/jira/browse/FLINK-25792
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
> Async Sink Base is too being flushed too frequently resulting in backpressure 
> even when buffer is near empty
> *Cause:*
> During a write(), flushIfAble() is called, which checks if the number of 
> buffered elements is greater than a batch size, and if so, insists that the 
> sink flushes immediately, even if the number of inFlightRequests is greater 
> than the maximum allowed number of inFlightRequests, resulting in a yield of 
> the current mailbox thread, and hence blocks.
> Notice that this can occur even if the buffer is near empty, so the blocking 
> behaviour is unnecessary and undesirable, since we would like the element to 
> be written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty

2022-01-24 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25792:
---
Issue Type: Bug  (was: New Feature)

> Async Sink Base is too being flushed too frequently resulting in backpressure 
> even when buffer is near empty
> 
>
> Key: FLINK-25792
> URL: https://issues.apache.org/jira/browse/FLINK-25792
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
> Async Sink Base is too being flushed too frequently resulting in backpressure 
> even when buffer is near empty
> *Cause:*
> During a write(), flushIfAble() is called, which checks if the number of 
> buffered elements is greater than a batch size, and if so, insists that the 
> sink flushes immediately, even if the number of inFlightRequests is greater 
> than the maximum allowed number of inFlightRequests, resulting in a yield of 
> the current mailbox thread, and hence blocks.
> Notice that this can occur even if the buffer is near empty, so the blocking 
> behaviour is unnecessary and undesirable, since we would like the element to 
> be written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits

2022-01-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25793:
--

 Summary: Kinesis Data Streams sink is not limiting throughput to 
the destination and therefore exceeding rate limits
 Key: FLINK-25793
 URL: https://issues.apache.org/jira/browse/FLINK-25793
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits

2022-01-24 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25793:
---
Description: 
h2. Bug:

Kinesis Data Streams sink is not limiting throughput to the destination and 
therefore exceeding rate limits

*Cause:*

We are not throttling our requests downstream at all.

We should monitor for requests that have failed with ThroughputExceeded 
exceptions and reduce the throughput accordingly.

  was:
h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.


> Kinesis Data Streams sink is not limiting throughput to the destination and 
> therefore exceeding rate limits
> ---
>
> Key: FLINK-25793
> URL: https://issues.apache.org/jira/browse/FLINK-25793
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
> Kinesis Data Streams sink is not limiting throughput to the destination and 
> therefore exceeding rate limits
> *Cause:*
> We are not throttling our requests downstream at all.
> We should monitor for requests that have failed with ThroughputExceeded 
> exceptions and reduce the throughput accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-01-25 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481801#comment-17481801
 ] 

Zichen Liu commented on FLINK-24229:


Hi [~Gusev] 

Thank you so much for the contribution. I will be available and ready to help 
with & review the pull request.

Can't wait. I really hope we can make it for the release.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-01-26 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482452#comment-17482452
 ] 

Zichen Liu commented on FLINK-24229:


(y)

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated

2022-01-26 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25731.
--
Resolution: Fixed

> Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
> 
>
> Key: FLINK-25731
> URL: https://issues.apache.org/jira/browse/FLINK-25731
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> This consumer based on the Kinesis Consumer Library (KCL) has been 
> deprecated, a Jira to create an equivalent consumer based on AWS SDK for Java 
> 2.x has been raised in FLINK-25729 and planned for Flink 1.16.
> This producer based on the Kinesis Producer Library KPL has been superseded. 
> The new sink can be found in the module 
> flink-connectors/flink-connector-aws-kinesis-data-streams and package 
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink. It is based 
> on the AWS SDK for Java 2.x. The work to replace this sink was carried out in 
> FLINK-24227.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24905) [FLIP-171] KDS implementation of Async Sink Table API

2022-01-27 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24905:
---
Description: 
h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

 

*Update:*

^^ Status Update ^^
__List of all work outstanding for 1.15 release__

[Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs
[Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop 
if not flushing during commit
[Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as 
deprecated (Prod: FLINK-24227)
[Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs
[Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries 
in order not in reverse
[Merged] https://github.com/apache/flink/pull/18512 - changing failed requests 
handler to accept List in AsyncSinkWriter
[Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element 
converter
[Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data 
streams sql uber-jar

Ready for review:
[SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & 
Docs
[BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename 
flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module 
names) and KinesisData*Sink to Kinesis*Sink (class names)

Pending PR:
* Firehose Table API Sink & Docs
* KDF Table API SQL jar

TBD:
* FLINK-25846: Not shutting down
* FLINK-25848: Validation during start up
* FLINK-25792: flush() bug
* FLINK-25793: throughput exceeded
* Update the defaults of KDS sink and update the docs + do the same for KDF
* add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the 
`flink-connector-base` and propagate it to the two connectors
- feature freeze
* KDS performance testing
* KDF performance testing
* Clone the new docs to .../contents.zh/... and add the location to the 
corresponding Chinese translation jira - KDS -
* Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs 
(legacy issue)
- Flink 1.15 release
* KDS end to end sanity test - hits aws apis rather than local docker images
* KDS Python wrappers
* FLINK-25733 - Create A migration guide for Kinesis Table API connector - can 
happen after 1.15
* If `endpoint` is provided, `region` should not be required like it currently 
is
* Test if Localstack container requires the 1ms timeout
* Adaptive level of logging (in discussion)

FYI:
* FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - 
https://github.com/apache/flink/pull/18449
* https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink

Chinese translation:
https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink
https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink
https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink

  was:
h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]


> [FLIP-171] KDS implementation of Async Sink Table API
> -
>
> Key: FLINK-24905
> URL: https://issues.apache.org/jira/browse/FLINK-

[jira] [Updated] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink

2022-01-27 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24228:
---
Description: 
h2. Motivation

*User stories:*
As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{{}KinesisTableApiITCase{}}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

 

*Update:*

^^ Status Update ^^
_{_}List of all work outstanding for 1.15 release{_}_

[Merged] [https://github.com/apache/flink/pull/18165] - KDS DataStream Docs
[Merged] [https://github.com/apache/flink/pull/18396] - [hotfix] for infinte 
loop if not flushing during commit
[Merged] [https://github.com/apache/flink/pull/18421] - Mark Kinesis Producer 
as deprecated (Prod: FLINK-24227)
[Merged] [https://github.com/apache/flink/pull/18348] - KDS Table API Sink & 
Docs
[Merged] [https://github.com/apache/flink/pull/18488] - base sink retry entries 
in order not in reverse
[Merged] [https://github.com/apache/flink/pull/18512] - changing failed 
requests handler to accept List in AsyncSinkWriter
[Merged] [https://github.com/apache/flink/pull/18483] - Do not expose the 
element converter
[Merged] [https://github.com/apache/flink/pull/18468] - Adding Kinesis data 
streams sql uber-jar

Ready for review:
[SUCCESS ] [https://github.com/apache/flink/pull/18314] - KDF DataStream Sink & 
Docs
[BLOCKED on ^^ ] [https://github.com/apache/flink/pull/18426] - rename 
flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module 
names) and KinesisData*Sink to Kinesis*Sink (class names)

Pending PR:
 * Firehose Table API Sink & Docs
 * KDF Table API SQL jar

TBD:
 * FLINK-25846: Not shutting down
 * FLINK-25848: Validation during start up
 * FLINK-25792: flush() bug
 * FLINK-25793: throughput exceeded
 * Update the defaults of KDS sink and update the docs + do the same for KDF
 * add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the 
`flink-connector-base` and propagate it to the two connectors
 -- 
 --- 
  
 - 
 -- 
 --- 
  
 - 
 -- 
 --- 
  
 - feature freeze
 * KDS performance testing
 * KDF performance testing
 * Clone the new docs to .../contents.zh/... and add the location to the 
corresponding Chinese translation jira - KDS -
 * Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs 
(legacy issue)
 -- 
 --- 
  
 - 
 -- 
 --- 
  
 - 
 -- 
 --- 
  
 - Flink 1.15 release
 * KDS end to end sanity test - hits aws apis rather than local docker images
 * KDS Python wrappers
 * FLINK-25733 - Create A migration guide for Kinesis Table API connector - can 
happen after 1.15
 * If `endpoint` is provided, `region` should not be required like it currently 
is
 * Test if Localstack container requires the 1ms timeout
 * Adaptive level of logging (in discussion)

FYI:
 * FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - 
[https://github.com/apache/flink/pull/18449]
 * https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink

Chinese translation:
https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink
https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink
https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink

  was:
h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for

[jira] [Updated] (FLINK-25793) Introduce rate limiting mechanism to Async Sink Base

2022-02-01 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25793:
---
Description: 
h2. Bug:

The Async Sink Base sink is not limiting throughput to the destination and 
therefore exceeding rate limits

*Cause:*

We are not throttling our requests downstream at all.

We should monitor for requests that have failed with ThroughputExceeded 
exceptions and reduce the throughput accordingly.

  was:
h2. Bug:

Kinesis Data Streams sink is not limiting throughput to the destination and 
therefore exceeding rate limits

*Cause:*

We are not throttling our requests downstream at all.

We should monitor for requests that have failed with ThroughputExceeded 
exceptions and reduce the throughput accordingly.


> Introduce rate limiting mechanism to Async Sink Base
> 
>
> Key: FLINK-25793
> URL: https://issues.apache.org/jira/browse/FLINK-25793
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
> The Async Sink Base sink is not limiting throughput to the destination and 
> therefore exceeding rate limits
> *Cause:*
> We are not throttling our requests downstream at all.
> We should monitor for requests that have failed with ThroughputExceeded 
> exceptions and reduce the throughput accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25793) Introduce rate limiting mechanism to Async Sink Base

2022-02-01 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-25793:
---
Summary: Introduce rate limiting mechanism to Async Sink Base  (was: 
Kinesis Data Streams sink is not limiting throughput to the destination and 
therefore exceeding rate limits)

> Introduce rate limiting mechanism to Async Sink Base
> 
>
> Key: FLINK-25793
> URL: https://issues.apache.org/jira/browse/FLINK-25793
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Bug:
> Kinesis Data Streams sink is not limiting throughput to the destination and 
> therefore exceeding rate limits
> *Cause:*
> We are not throttling our requests downstream at all.
> We should monitor for requests that have failed with ThroughputExceeded 
> exceptions and reduce the throughput accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (FLINK-25692) Documentation for Kinesis Firehose Sink

2022-02-01 Thread Zichen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu closed FLINK-25692.
--
Resolution: Fixed

> Documentation for Kinesis Firehose Sink
> ---
>
> Key: FLINK-25692
> URL: https://issues.apache.org/jira/browse/FLINK-25692
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> _FLINK-24228 introduces a new sink for Kinesis Data Streams that supersedes 
> the existing one based on KPL._
> *Scope:*
>  * Deprecate the current section in the docs for the Kinesis KPL sink and 
> write documentation and usage guide for the new sink.
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   >