[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611944#comment-16611944
 ] 

ASF GitHub Bot commented on FLINK-3875:
---------------------------------------

dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r216992838
 
 

 ##########
 File path: docs/dev/table/connect.md
 ##########
 @@ -583,6 +584,104 @@ Make sure to add the version-specific Kafka dependency. 
In addition, a correspon
 
 {% top %}
 
+### Elasticsearch Connector
+
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-info">Format: JSON-only</span>
+
+The Elasticsearch connector allows for writing into an index of the 
Elasticsearch search engine.
+
+The connector operates in [upsert mode](#update-modes) and exchanges 
UPSERT/DELETE messages with the external system using a [key defined by the 
query](streaming.html#table-to-stream-conversion). It can be defined as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions 
are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" 
by default)
+                              //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in 
keys ("null" by default)
+
+    // optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
+    .failureHandlerFail()          // optional: throws an exception if a 
request fails and causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the 
request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+
+    // optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on 
checkpoint (see notes below!)
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to 
buffer for each bulk request
+    .bulkFlushMaxSize(42)          // optional: maximum size of buffered 
actions (in MB) per bulk request
+    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in 
milliseconds)
+
+    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
+    .bulkFlushBackoffExponential() //   or use an exponential backoff type
+    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff 
attempt (in milliseconds)
+
+    // optional: connection properties to be used during REST communication to 
Elasticsearch
+    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in 
milliseconds) between retries
+    .connectionPathPrefix("/v1")   // optional: prefix string to be added to 
every REST communication
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: kafka
+  version: 6                # required: valid connector versions are "6"
+    hosts:                  # required: one or more Elasticsearch hosts to 
connect to
+      - hostname: "localhost"
+        port: 9200
+        schema: "http"
 
 Review comment:
   +1 for `protocol`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> ---------------------------------
>
>                 Key: FLINK-3875
>                 URL: https://issues.apache.org/jira/browse/FLINK-3875
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors, Table API &amp; SQL
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to