[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882842#comment-15882842
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3358


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882763#comment-15882763
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3358
  
Merging this to `master`   ..


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882302#comment-15882302
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3358
  
+1 to merge.

Thank you for answering all my comments so detailed!


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882301#comment-15882301
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102905100
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Thanks a lot for looking into this in detail.
I think calling flush() this way is okay then.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880120#comment-15880120
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102662136
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Following my arguments above, I think the busy loop you mentioned shouldn't 
happen, because bulk processor's internal `bulkRequest.numberOfActions()` 
should always be synced with our `numPendingRecords`. (i.e., it should not 
occur that `bulkRequest.numberOfActions() == 0` but our own `numPendingRecords 
!= 0`).

So in that case, if `bulkRequest.numberOfActions() == 0` then my original 
loop implementation just fallbacks to a single pass with 2 condition checks.

To a certain extent, I think it might be better to stick to the original 
loop implementation, so that we're not locked-in with how the `BulkProcessor`'s 
flush is implemented. As you can see from a commit I just pushed (2956f99) 
which modifies the mock bulk processor in tests to correctly mimic the flushing 
behaviour I described above, the loop implementation still pass the tests.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880075#comment-15880075
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102656903
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

On a second look, I think my previous statement is incorrect.

To elaborate, this is the way the `BulkProcessor`'s `flush` is implemented:
```
if(this.bulkRequest.numberOfActions() > 0) {
this.execute();
}
```

`execute()` doesn't return until `afterBulk` is called on the listener. 
Since we can re-add requests to the bulk processor within `afterBulk`, the 
`bulkRequest.numberOfActions() > 0` will be true again and enters the loop.

Therefore, the `bulkProcessor.flush()` can actually just be called once, 
and will work with our failure-handler re-adding strategy so that the flush 
also waits for re-added requests before returning. We can just check once on 
`numPendingRequests` after the flush to make sure the flush works as expected.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880059#comment-15880059
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654252
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Waiting on `numPendingRequests` makes sense, I'll try and see if it works 
out.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880057#comment-15880057
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102654126
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

Ah, I see the problem here ...
The bulk processor's internal `bulkRequest.numberOfActions() == 0` will 
become `true` as soon as it starts executing the flush, and not after 
`afterBulk` is invoked.

So, since our `numPendingRequests` implementation relies on the `afterBulk` 
callback, we might have busy loops on `bulkProcessor.flush()` while we wait for 
`numPendingRequests` to become 0.

This is quite a nice catch actually! So no worries on bringing it up now.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879990#comment-15879990
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102648005
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Metricifing the ES connectors seems like a good idea, especially with its 
growing popularity. I'll think about it and file a JIRA with some initial 
proposals.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879988#comment-15879988
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647903
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Regarding the frequency of `EsRejectedExecutionException`, from my 
experience with ES before, they pop up a lot with under-resourced / configured 
ES clusters.

It can flood logs if it isn't treated accordingly, but not logging them can 
be bad too because you'll know nothing about it, unless the sink eventually 
fails with it.

We could also remove the failure logging from the `ElasticsearchSinkBase` 
and let the user be responsible for that, but I'm a bit undecided here. Open to 
suggestions for this!


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879974#comment-15879974
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102647565
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

The `BulkProcessor` listener actually logs them as LOG.error before they 
are processed by the failure handler (line 171 and line 180). So, these 
failures are always logged regardless of whether the failure handler chooses to 
log them. Do you think that's ok?


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879028#comment-15879028
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102546521
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import 
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that 
failed due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch 
node queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+   private static final long serialVersionUID = -7423562912824511906L;
+
+   @Override
+   public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+   if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+   indexer.add(action);
--- End diff --

Do you think this is worth a LOG.debug statement?
Or will it happen too often / is too uninformative?

I wonder if we could use the metrics system for exposing stuff like error 
rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch 
connectors to "metricify" them)


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879029#comment-15879029
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102544239
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -92,40 +152,59 @@
/** Call bridge for different version-specfic */
private final ElasticsearchApiCallBridge callBridge;
 
+   /**
+* Number of pending action requests not yet acknowledged by 
Elasticsearch.
+* This value is maintained only if {@link 
ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+*
+* This is incremented whenever the user adds (or re-adds through the 
{@link ActionRequestFailureHandler}) requests
+* to the {@link RequestIndexer}. It is decremented for each completed 
request of a bulk request, in
+* {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, 
BulkResponse)} and
+* {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, 
Throwable)}.
+*/
+   private AtomicLong numPendingRequests = new AtomicLong(0);
+
/** Elasticsearch client created using the call bridge. */
private transient Client client;
 
/** Bulk processor to buffer and send requests to Elasticsearch, 
created using the client. */
private transient BulkProcessor bulkProcessor;
 
/**
-* This is set from inside the {@link BulkProcessor.Listener} if a 
{@link Throwable} was thrown in callbacks.
+* This is set from inside the {@link BulkProcessor.Listener} if a 
{@link Throwable} was thrown in callbacks and
+* the user considered it should fail the sink via the
+* {@link ActionRequestFailureHandler#onFailure(ActionRequest, 
Throwable, int, RequestIndexer)} method.
+*
+* Errors will be checked and rethrown before processing each input 
element, and when the sink is closed.
 */
private final AtomicReference failureThrowable = new 
AtomicReference<>();
 
public ElasticsearchSinkBase(
ElasticsearchApiCallBridge callBridge,
Map userConfig,
-   ElasticsearchSinkFunction elasticsearchSinkFunction) {
+   ElasticsearchSinkFunction elasticsearchSinkFunction,
+   ActionRequestFailureHandler failureHandler) {
 
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction = 
checkNotNull(elasticsearchSinkFunction);
+   this.failureHandler = checkNotNull(failureHandler);
 
-   // we eagerly check if the user-provided sink function is 
serializable;
-   // otherwise, if it isn't serializable, users will merely get a 
non-informative error message
+   // we eagerly check if the user-provided sink function and 
failure handler is serializable;
+   // otherwise, if they aren't serializable, users will merely 
get a non-informative error message
// "ElasticsearchSinkBase is not serializable"
-   try {
-   
InstantiationUtil.serializeObject(elasticsearchSinkFunction);
-   } catch (Exception e) {
-   throw new IllegalArgumentException(
-   "The implementation of the provided 
ElasticsearchSinkFunction is not serializable. " +
-   "The object probably contains or references non 
serializable fields.");
-   }
 
-   checkNotNull(userConfig);
+   
checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+   "The implementation of the provided 
ElasticsearchSinkFunction is not serializable. " +
+   "The object probably contains or references 
non-serializable fields.");
+
+   checkArgument(InstantiationUtil.isSerializable(failureHandler),
+   "The implementation of the provided 
ActionRequestFailureHandler is not serializable. " +
+   "The object probably contains or references 
non-serializable fields.");
--- End diff --

That's so much nicer now :)


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879027#comment-15879027
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102545769
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
}
 
@Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   // no initialization needed
+   }
+
+   @Override
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+   checkErrorAndRethrow();
+
+   if (flushOnCheckpoint) {
+   do {
+   bulkProcessor.flush();
--- End diff --

This flush() might be a noop if bulkRequest.numberOfActions() == 0 in the 
bulkProcessor implementation.
If so, this loop turns into a busy loop wasting CPU cycles.
I wonder if we should wait on the numPendingRequests and notify on it once 
we update it?

(Sorry that I bring this up in the second review)


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878744#comment-15878744
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3358
  
@rmetzger All of your comments have been addressed, and tests for the new 
features have been added (in `ElasticsearchSinkBaseTest`). Can you take another 
look? Thanks a lot!

Some notes on changes I made that weren't previously discussed:
1. Renamed `NoOpActionRequestFailureHandler` to just 
`NoOpActionFailureHandler` - less of a mouthful ;-)
2. I added the responsed REST status code through the failure handler's 
`onFailure(...)` callback. The reason for this is explained in the doc / 
Javadoc changes of the last follow-up commit (c594523).


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877698#comment-15877698
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102400283
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
--- End diff --

It seems like we will need to use the failure handler too.
Any exception that the Elasticsearch `Client` throws while issuing the bulk 
request can appear here too. So, exceptions like unreachable node can pop out 
here as well, and I don't think we should implicitly treat them as temporary.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876150#comment-15876150
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3358
  
Yes, I overlooked adding tests for this.

Thanks a lot for the reviews @rmetzger! I'll address your comments and 
tests for the additional features.
Will ping you once it's ready for another review.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876148#comment-15876148
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102233292
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
--- End diff --

Alright, then I'll simply just change the `boolean` return usage to 
throwing a `Throwable`, and add some Javadoc that any state in the failure 
handler is volatile.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875979#comment-15875979
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3358
  
I finished checking the code.
The only thing I'm missing from the change is a test case ensuring that the 
implementation works.

I think we can build a test similar to what we did with Kafka. (With a mock 
producer)


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875954#comment-15875954
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102204839
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
--- End diff --

Thank you. I'm undecided if we want to add this here or not.
Just based on my experience with the Kafka connector, at some point there 
is a user who wants to have a very specific custom behavior :) But we can also 
keep it as is and fix it if a user needs it (worst case: they have to override 
our implementation)


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875951#comment-15875951
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102204579
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
--- End diff --

Puh, that's good :) Thx for the explanation.
I didn't look close enough on your changes.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875948#comment-15875948
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102204385
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
--- End diff --

Mh. I don't know if the use case I've mentioned makes a lot of sense. 
Probably most of the users just want to use a custom logic to decide how to do 
the retries / discards.

I think we shouldn't do complicated things like checkpointing the state of 
the failure handler. Its good enough if the user keeps it locally (and loses it 
on failure)



> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875944#comment-15875944
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102203605
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
--- End diff --

I agree that we should provide a reasonable default behavior, instead of 
just retrying.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875825#comment-15875825
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102186137
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
--- End diff --

I wouldn't suggest adding a `ActionRequestFailureHandler` that 
out-of-the-box retries for all exceptions, though. That could let users easily 
overlook some exceptions that simply cannot be retried without custom logic 
(for example, malformed documents with wrong field types).


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875822#comment-15875822
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102185790
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
--- End diff --

For the use case you mentioned, that would mean the user implements a 
stateful `ActionRequestFailureHandler`, with its state being the number of 
failures so far, correct?

I didn't think about this too much, but I guess there shouldn't be a 
problem for this.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875819#comment-15875819
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102185551
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
--- End diff --

That's actually a good idea, I didn't think it that way. I would like to 
change it to throw a `Throwable` instead.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875810#comment-15875810
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102184622
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
--- End diff --

The `BulkProcessorIndexer` will increment `numPendingRequests` whenever the 
user calls `add(ActionRequest)`. So, in your description, when the user re-adds 
the 500 requests, `numPendingRequests` first becomes `500+500=1000`. Then, we 
consider the failed 500 requests to have completed when this line is reached, 
so `numPendingRequests` becomes `1000-500=500`.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875807#comment-15875807
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102184175
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
--- End diff --

I'll need to double check this. The ES documents don't say much about this.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875805#comment-15875805
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102183824
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -67,10 +73,56 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+   public enum FlushBackoffType {
+   CONSTANT,
+   EXPONENTIAL
+   }
+
+   public class BulkFlushBackoffPolicy implements Serializable {
+
+   private static final long serialVersionUID = 
-6022851996101826049L;
+
+   // the default values follow the Elasticsearch default settings 
for BulkProcessor
+   private FlushBackoffType backoffType = 
FlushBackoffType.EXPONENTIAL;
+   private int maxRetryCount = 8;
+   private long delayMillis = 50;
+
+   public FlushBackoffType getBackoffType() {
+   return backoffType;
+   }
+
+   public int getMaxRetryCount() {
+   return maxRetryCount;
+   }
+
+   public long getDelayMillis() {
+   return delayMillis;
+   }
+
+   public void setBackoffType(FlushBackoffType backoffType) {
+   this.backoffType = checkNotNull(backoffType);
+   }
+
+   public void setMaxRetryCount(int maxRetryCount) {
+   checkArgument(maxRetryCount > 0);
+   this.maxRetryCount = maxRetryCount;
+   }
+
+   public void setDelayMillis(long delayMillis) {
+   checkArgument(delayMillis > 0);
--- End diff --

True, 0 should be acceptable. Nice catches.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875802#comment-15875802
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102183643
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
--- End diff --

I think a pre-implemented `ActionRequestFailureHandler` that re-adds 
requests for full queue exceptions will be nice, and useful out-of-the box for 
a large portion of users. Great idea!


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875804#comment-15875804
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102183754
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -67,10 +73,56 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+   public enum FlushBackoffType {
+   CONSTANT,
+   EXPONENTIAL
+   }
+
+   public class BulkFlushBackoffPolicy implements Serializable {
+
+   private static final long serialVersionUID = 
-6022851996101826049L;
+
+   // the default values follow the Elasticsearch default settings 
for BulkProcessor
+   private FlushBackoffType backoffType = 
FlushBackoffType.EXPONENTIAL;
+   private int maxRetryCount = 8;
+   private long delayMillis = 50;
+
+   public FlushBackoffType getBackoffType() {
+   return backoffType;
+   }
+
+   public int getMaxRetryCount() {
+   return maxRetryCount;
+   }
+
+   public long getDelayMillis() {
+   return delayMillis;
+   }
+
+   public void setBackoffType(FlushBackoffType backoffType) {
+   this.backoffType = checkNotNull(backoffType);
+   }
+
+   public void setMaxRetryCount(int maxRetryCount) {
+   checkArgument(maxRetryCount > 0);
--- End diff --

Yup, 0 should be acceptable.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875797#comment-15875797
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102183423
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
--- End diff --

Good point. I'll add to mention that by default, the sink uses the 
`NoOpActionRequestFailureHandler `.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875784#comment-15875784
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102179118
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -122,10 +198,19 @@ public ElasticsearchSinkBase(
"The object probably contains or references non 
serializable fields.");
}
 
-   checkNotNull(userConfig);
+   try {
+   InstantiationUtil.serializeObject(failureHandler);
+   } catch (Exception e) {
+   throw new IllegalArgumentException(
+   "The implementation of the provided 
ActionRequestFailureHandler is not serializable. " +
+   "The object probably contains or 
references non serializable fields.");
+   }
--- End diff --

This looks a bit like duplicate code. I think adding a utility into the 
`InstantiationUtil` that is called `isSerializable()` would be cleaner and save 
some LOC.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875781#comment-15875781
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102177099
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
+
+ 
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by setting the following 
values in
+the provided `Map`:
+
+ * **bulk.flush.max.actions**: Maximum amount of actions to buffer before 
flushing.
+ * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to 
buffer before flushing.
+ * **bulk.flush.interval.ms**: Interval at 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875786#comment-15875786
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102177012
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+### Handling Failing Elasticsearch Requests
+
+Elasticsearch action requests may fail due to a variety of reasons, 
including
+temporarily saturated node queue capacity or malformed documents to be 
indexed.
+The Flink Elasticsearch Sink allows the user to specify how request
+failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
+providing it to the constructor.
+
+Below is an example:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new ElasticsearchSink<>(
+config, transportAddresses,
+new ElasticsearchSinkFunction() {...},
+new ActionRequestFailureHandler() {
+@Override
+boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action);
+return false;
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+// malformed document; simply drop request without failing 
sink
+return false;
+} else {
+// for all other failures, fail the sink
+return true;
+}
+}
+}));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new ElasticsearchSink(
+config, transportAddresses,
+new ElasticsearchSinkFunction[String] {...},
+new ActionRequestFailureHandler {
+override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
+// this example uses Apache Commons to search for nested 
exceptions
+
+if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+// full queue; re-add document for indexing
+indexer.add(action)
+return false
+} else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+// malformed document; simply drop request without failing 
sink
+return false
+} else {
+// for all other failures, fail the sink
+return true
+}
+}
+}))
+{% endhighlight %}
+
+
+
+The above example will let the sink re-add requests that failed due to
+queue capacity saturation and drop requests with malformed documents, 
without
+failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
+is not provided to the constructor, the sink will fail for any kind of 
error.
+
+Note that `onFailure` is called for failures that still occur only after 
the
+`BulkProcessor` internally finishes all backoff retry attempts.
+By default, the `BulkProcessor` retries to a maximum of 8 attempts with
+an exponential backoff. For more information on the behaviour of the
+internal `BulkProcessor` and how to configure it, please see the following 
section.
+
+
+IMPORTANT: Re-adding requests back to the internal 
BulkProcessor
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+This also means that if re-added requests never succeed, the checkpoint 
will
+never finish.
+
--- End diff --

I think the docs should mention the `NoOpActionRequestFailureHandler`.

Also I wonder if we should offer a default 
`RetryActionRequestFailureHandler`. I suspect that many users will need that. 
What do you think?


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875785#comment-15875785
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102178621
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -67,10 +73,56 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+   public enum FlushBackoffType {
+   CONSTANT,
+   EXPONENTIAL
+   }
+
+   public class BulkFlushBackoffPolicy implements Serializable {
+
+   private static final long serialVersionUID = 
-6022851996101826049L;
+
+   // the default values follow the Elasticsearch default settings 
for BulkProcessor
+   private FlushBackoffType backoffType = 
FlushBackoffType.EXPONENTIAL;
+   private int maxRetryCount = 8;
+   private long delayMillis = 50;
+
+   public FlushBackoffType getBackoffType() {
+   return backoffType;
+   }
+
+   public int getMaxRetryCount() {
+   return maxRetryCount;
+   }
+
+   public long getDelayMillis() {
+   return delayMillis;
+   }
+
+   public void setBackoffType(FlushBackoffType backoffType) {
+   this.backoffType = checkNotNull(backoffType);
+   }
+
+   public void setMaxRetryCount(int maxRetryCount) {
+   checkArgument(maxRetryCount > 0);
+   this.maxRetryCount = maxRetryCount;
+   }
+
+   public void setDelayMillis(long delayMillis) {
+   checkArgument(delayMillis > 0);
--- End diff --

We should accept 0 here as well, if users want to retry immediately (for 
whatever reason :) )


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875787#comment-15875787
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102181691
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
--- End diff --

Lets say a bulk with 500 actions fails, so we re-add the bulk again, but 
subtract 500 actions from the pending requests.

Now the bulk succeeds and we subtract 500 actions again. Which would make 
the num pending requests negative? and void the at least once guarantees?

Am I overseeing something here?


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875780#comment-15875780
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102180098
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
--- End diff --

I wonder if it would be better if the `onFailure` method would not return a 
boolean but throw a Throwable?
This way users have more flexibility in implementing their failure handler.

For example if a failure handler is doing three retries and fails 
afterwards, the original exception will be thrown. If the `onFailure()` method 
can throw their own exception, you can throw a custom exception that tells the 
user about the three retries.

We can definitively discuss this because this change is annoying to do 
(docs & javadocs need to be updated).


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875782#comment-15875782
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102177573
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by 
the user to define how failed
+ * {@link ActionRequest ActionRequests} should be handled, ex. dropping 
them, reprocessing malformed documents, or
--- End diff --

I'm not sure if the "ex." is correct here: 
http://english.stackexchange.com/questions/16197/whats-the-difference-between-e-g-and-ex


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875783#comment-15875783
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102178460
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -67,10 +73,56 @@
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+   public enum FlushBackoffType {
+   CONSTANT,
+   EXPONENTIAL
+   }
+
+   public class BulkFlushBackoffPolicy implements Serializable {
+
+   private static final long serialVersionUID = 
-6022851996101826049L;
+
+   // the default values follow the Elasticsearch default settings 
for BulkProcessor
+   private FlushBackoffType backoffType = 
FlushBackoffType.EXPONENTIAL;
+   private int maxRetryCount = 8;
+   private long delayMillis = 50;
+
+   public FlushBackoffType getBackoffType() {
+   return backoffType;
+   }
+
+   public int getMaxRetryCount() {
+   return maxRetryCount;
+   }
+
+   public long getDelayMillis() {
+   return delayMillis;
+   }
+
+   public void setBackoffType(FlushBackoffType backoffType) {
+   this.backoffType = checkNotNull(backoffType);
+   }
+
+   public void setMaxRetryCount(int maxRetryCount) {
+   checkArgument(maxRetryCount > 0);
--- End diff --

Isn't 0 also an acceptable value here? If users want to disable retries 
entirely?


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875788#comment-15875788
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3358#discussion_r102181294
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
@Override
public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
-   for (BulkItemResponse itemResp 
: response.getItems()) {
-   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
+   BulkItemResponse itemResponse;
+   Throwable failure;
+
+   for (int i = 0; i < 
response.getItems().length; i++) {
+   itemResponse = 
response.getItems()[i];
+   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
-   
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-   
failureThrowable.compareAndSet(null, failure);
+   
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
+
+   if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
+   
failureThrowable.compareAndSet(null, failure);
+   }
}
}
}
+
+   
numPendingRequests.getAndAdd(-request.numberOfActions());
}
 
@Override
public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-   failureThrowable.compareAndSet(null, 
failure);
+   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+
+   // whole bulk request failures are 
usually just temporary timeouts on
+   // the Elasticsearch side; simply retry 
all action requests in the bulk
+   for (ActionRequest action : 
request.requests()) {
+   requestIndexer.add(action);
+   }
--- End diff --

I wonder if we should provide a custom, pluggable retry logic here as well. 
If you are sure that only connection issues cause this, we can leave it as is.


> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there 

[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874253#comment-15874253
 ] 

ASF GitHub Bot commented on FLINK-5487:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3358

[FLINK-5487] [elasticsearch] At-least-once ElasticsearchSink

This PR adds proper support for an at-least-once `ElasticsearchSink`. This 
is based on the pluggable error handling strategy functionality added in #3426, 
so only the last commit is relevant.

Like the Kafka producer, the way it works is that pending requests not yet 
acknowledged by Elasticsearch needs to be flushed before proceeding with the 
next record from upstream.
Slight difference is that for the `ElasticsearchSink`, since we're allowing 
re-adding failed requests back to the internal `BulkProcessor` (as part of 
#3426), we'll also need to wait for the re-added requests. The docs warn that 
if requests are re-added, it may lead to longer checkpoints since we need to 
wait for those too.

Flushing is enabled by default, but we provide a `disableFlushOnCheckpoint` 
method to switch it off. The docs and Javadoc of the method warns the user how 
this would affect at-least-once delivery.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-5487

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3358.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3358


commit 6a826b8eb7a98e3d15bc44d827df54c94fdd
Author: Max Kuklinski 
Date:   2016-11-23T16:54:11Z

[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, 
bulk queue on node full

commit 9cb60c263fb0df9a8ccd82b33070e22085b5ab23
Author: Tzu-Li (Gordon) Tai 
Date:   2017-01-30T05:55:26Z

[FLINK-5353] [elasticsearch] User-provided failure handler for 
ElasticsearchSink

This commit fixes both FLINK-5353 and FLINK-5122. It allows users to 
implement a
failure handler to control how failed action requests are dealt with.

The commit also includes general improvements to FLINK-5122:
1. Use the built-in backoff functionality in the Elasticsearch 
BulkProcessor (not
available for Elasticsearch 1.x)
2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure 
handler

commit 1c448e3177c65ebc627bdd4ecfff76bbf209ddde
Author: Tzu-Li (Gordon) Tai 
Date:   2017-02-20T08:50:19Z

[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink




> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5487) Proper at-least-once support for ElasticsearchSink

2017-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822356#comment-15822356
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5487:


Restructuring for the ES connectors is included as part of this issue. We 
should block this issue until FLINK-4988 is resolved so at-least-once support 
can be simultaneously included in all ES versions.

> Proper at-least-once support for ElasticsearchSink
> --
>
> Key: FLINK-5487
> URL: https://issues.apache.org/jira/browse/FLINK-5487
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)