[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-29 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450520#comment-17450520
 ] 

Yuri Gusev edited comment on FLINK-24229 at 11/29/21, 3:16 PM:
---

Hi [~CrynetLogistics] 

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define what to do in the end (for 
example after all retries towards DynamoDB for the current batch).

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it will send the failed records back to the queue and retry 
again, or on fatalException it will propagate the error and stop the 
application.


was (Author: gusev):

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define what to do in the end (for 
example after all retries towards DynamoDB for the current batch). 

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it will send the failed records back to the queue and retry 
again, or on fatalException it will propagate the error and stop the 
application.


> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-29 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450520#comment-17450520
 ] 

Yuri Gusev edited comment on FLINK-24229 at 11/29/21, 3:17 PM:
---

Hi [~CrynetLogistics] 

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define what to do in the end (for 
example after all retries towards DynamoDB for the current batch).

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it seems [it will send the failed records 
back|https://github.com/apache/flink/blob/cb5034b6b8d1a601ae3bc34feb3314518e78aae3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L304]
 to the queue and retry again, or on fatalException it will propagate the error 
and stop the application.


was (Author: gusev):
Hi [~CrynetLogistics] 

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define what to do in the end (for 
example after all retries towards DynamoDB for the current batch).

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it will send the failed records back to the queue and retry 
again, or on fatalException it will propagate the error and stop the 
application.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-11-30 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450520#comment-17450520
 ] 

Yuri Gusev edited comment on FLINK-24229 at 11/30/21, 4:22 PM:
---

Hi [~CrynetLogistics] 

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define how to handle the failure 
(for example after all retries towards DynamoDB for the current batch, send the 
"poisonous record" to the DLQ, or drop it).

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it seems [it will send the failed records 
back|https://github.com/apache/flink/blob/cb5034b6b8d1a601ae3bc34feb3314518e78aae3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L304]
 to the queue and retry again, or on fatalException it will propagate the error 
and stop the application.


was (Author: gusev):
Hi [~CrynetLogistics] 

We are working on it, implementation is almost there, will submit a PR for 
review may be this/start of the next week. In our previous implementation we 
deduplicated entries during aggregation of a batch, but now we need to move 
this to the writer itself, because you are doing batching in the 
AsyncSinkWriter.

I'm not sure I followed your answer on the fatal exception behaviour.

What we would like to do is to allow user to define what to do in the end (for 
example after all retries towards DynamoDB for the current batch).

This is how we achieved it in the old implementation: 
[WriteRequestFailureHandler.java|https://github.com/YuriGusev/flink/blob/FLINK-16504_dynamodb_connector_rebased/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/WriteRequestFailureHandler.java],
 
[DynamoDbSink.java|https://github.com/YuriGusev/flink/blob/8787c343b615602c989fa793e0f4687ef40e530c/flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/DynamoDbSink.java#L355]

At the moment it seems [it will send the failed records 
back|https://github.com/apache/flink/blob/cb5034b6b8d1a601ae3bc34feb3314518e78aae3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L304]
 to the queue and retry again, or on fatalException it will propagate the error 
and stop the application.

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Zichen Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-07-08 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564322#comment-17564322
 ] 

Yuri Gusev edited comment on FLINK-24229 at 7/8/22 3:08 PM:


Ok thanks, we had few open questions in progress (like hiding or not 
ElementConverter). But it may be better to start fresh on a new PR 
[~dannycranmer]


was (Author: gusev):
Ok thanks, we had few open questions in progress (like hiding or not 
ElementConverter). But it may be better to start fresh on a new PR

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-06-02 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545434#comment-17545434
 ] 

Yuri Gusev edited comment on FLINK-24229 at 6/2/22 11:51 AM:
-

Thanks [~dannycranmer]. 

We fixed most of the comments, there is also a PR to hide ElementConverter for 
review separately,but we can merge into this one.

One last missing part is client creation, I'll try to fix it soon. But most of 
it available for re-review. Would be nice to get it out soon before too many 
changes again, we re-wrote it couple of times already (earlier implementation 
was without shared base  async connector class). :)


was (Author: gusev):
Thanks [~dannycranmer]. 

We fixed most of the comments, there is also a PR to hide ElementConverter for 
review separately,but we can merge into this one.

One last missing part is client creation, I'll try to fix it soon. But most of 
it available for re-review. Would be nice to get it out soon before too many 
changes again, we re-wrote it couple of times already :)

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-03-11 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504922#comment-17504922
 ] 

Yuri Gusev edited comment on FLINK-24229 at 3/11/22, 1:34 PM:
--

Ok thank you [~martijnvisser] and [~CrynetLogistics] .

There were some changes on the master and no there are conflicts so I'll rebase 
our job on top of the latest changes.

[~CrynetLogistics]  would you mind taking a look at the question we had in the 
PR about elementConverter being hidden. He have a sample PR ([see Nir's 
comment|https://github.com/apache/flink/pull/18518#discussion_r799306670]) on 
what it would take to hide elementConverter. But that might impact performance 
of the sink quite a lot.

We will wait with the documentation ticket until we agree with the reviewers on 
the ElementConverter changes. 


was (Author: gusev):
Ok thank you [~martijnvisser] and [~CrynetLogistics] .

There were some changes on the master and no there are conflicts so I'll rebase 
our job on top of the latest changes.

[~CrynetLogistics]  would you mind taking a look at the question we had in the 
PR about elementConverter being hidden. He have a sample PR ([see Nir's 
comment|http://example.com]https://github.com/apache/flink/pull/18518#discussion_r799306670)
 on what it would take to hide elementConverter. But that might impact 
performance of the sink quite a lot.

We will wait with the documentation ticket until we agree with the reviewers on 
the ElementConverter changes. 

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2022-03-11 Thread Yuri Gusev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504922#comment-17504922
 ] 

Yuri Gusev edited comment on FLINK-24229 at 3/11/22, 1:34 PM:
--

Ok thank you [~martijnvisser] and [~CrynetLogistics] .

There were some changes on the master and no there are conflicts so I'll rebase 
our job on top of the latest changes.

[~CrynetLogistics]  would you mind taking a look at the question we had in the 
PR about elementConverter being hidden? We have a sample PR ([see Nir's 
comment|https://github.com/apache/flink/pull/18518#discussion_r799306670]) on 
what it would take to hide elementConverter. But that might impact performance 
of the sink quite a lot.

We will wait with the documentation ticket until we agree with the reviewers on 
the ElementConverter changes. 


was (Author: gusev):
Ok thank you [~martijnvisser] and [~CrynetLogistics] .

There were some changes on the master and no there are conflicts so I'll rebase 
our job on top of the latest changes.

[~CrynetLogistics]  would you mind taking a look at the question we had in the 
PR about elementConverter being hidden. He have a sample PR ([see Nir's 
comment|https://github.com/apache/flink/pull/18518#discussion_r799306670]) on 
what it would take to hide elementConverter. But that might impact performance 
of the sink quite a lot.

We will wait with the documentation ticket until we agree with the reviewers on 
the ElementConverter changes. 

> [FLIP-171] DynamoDB implementation of Async Sink
> 
>
> Key: FLINK-24229
> URL: https://issues.apache.org/jira/browse/FLINK-24229
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Reporter: Zichen Liu
>Assignee: Yuri Gusev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> h2. Motivation
> *User stories:*
>  As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.
> *Scope:*
>  * Implement an asynchronous sink for DynamoDB by inheriting the 
> AsyncSinkBase class. The implementation can for now reside in its own module 
> in flink-connectors.
>  * Implement an asynchornous sink writer for DynamoDB by extending the 
> AsyncSinkWriter. The implementation must deal with failed requests and retry 
> them using the {{requeueFailedRequestEntry}} method. If possible, the 
> implementation should batch multiple requests (PutRecordsRequestEntry 
> objects) to Firehose for increased throughput. The implemented Sink Writer 
> will be used by the Sink class that will be created as part of this story.
>  * Java / code-level docs.
>  * End to end testing: add tests that hits a real AWS instance. (How to best 
> donate resources to the Flink project to allow this to happen?)
> h2. References
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)