dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r213597931

 File path: 
 @@ -0,0 +1,473 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.typeutils.TypeCheckUtils;
+import org.apache.flink.table.util.TableConnectorUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+ * A version-agnostic Elasticsearch {@link UpsertStreamTableSink}.
+ */
+public abstract class ElasticsearchUpsertTableSinkBase implements 
UpsertStreamTableSink<Row> {
+       /** Schema of the table. */
+       private final TableSchema schema;
+       /** Version-agnostic hosts configuration. */
+       private final List<Host> hosts;
+       /** Default index for all requests. */
+       private final String index;
+       /** Default document type for all requests. */
+       private final String docType;
+       /** Delimiter for composite keys. */
+       private final String keyDelimiter;
+       /** String literal for null keys. */
+       private final String keyNullLiteral;
+       /** Serialization schema used for the document. */
+       private final SerializationSchema<Row> serializationSchema;
+       /** Content type describing the serialization schema. */
+       private final XContentType contentType;
+       /** Failure handler for failing {@link 
org.elasticsearch.action.ActionRequest}s. */
+       private final ActionRequestFailureHandler failureHandler;
+       /**
+        * Map of optional configuration parameters for the Elasticsearch sink. 
The config is
+        * internal and can change at any time.
+        */
+       private final Map<SinkOption, String> sinkOptions;
+       /**
+        * Version-agnostic creation of {@link UpdateRequest}.
+        */
+       private final UpdateRequestFactory updateRequestFactory;
+       /** Key field indices determined by the query. */
+       private int[] keyFieldIndices = new int[0];
+       public ElasticsearchUpsertTableSinkBase(
+                       TableSchema schema,
+                       List<Host> hosts,
+                       String index,
+                       String docType,
+                       String keyDelimiter,
+                       String keyNullLiteral,
+                       SerializationSchema<Row> serializationSchema,
+                       XContentType contentType,
+                       ActionRequestFailureHandler failureHandler,
+                       Map<SinkOption, String> sinkOptions,
+                       UpdateRequestFactory updateRequestFactory)  {
+               this.schema = Preconditions.checkNotNull(schema);
+               this.hosts = Preconditions.checkNotNull(hosts);
+               this.index = Preconditions.checkNotNull(index);
+               this.keyDelimiter = Preconditions.checkNotNull(keyDelimiter);
+               this.keyNullLiteral = 
+               this.docType = Preconditions.checkNotNull(docType);
+               this.serializationSchema = 
+               this.contentType = Preconditions.checkNotNull(contentType);
+               this.failureHandler = 
+               this.sinkOptions = Preconditions.checkNotNull(sinkOptions);
+               this.updateRequestFactory = 
+       }
+       @Override
+       public void setKeyFields(String[] keyNames) {
+               final String[] fieldNames = getFieldNames();
+               final int[] keyFieldIndices = new int[keyNames.length];
+               for (int i = 0; i < keyNames.length; i++) {
+                       keyFieldIndices[i] = -1;
+                       for (int j = 0; j < fieldNames.length; j++) {
+                               if (keyNames[i].equals(fieldNames[j])) {
+                                       keyFieldIndices[i] = j;
+                                       break;
+                               }
+                       }
+                       if (keyFieldIndices[i] == -1) {
+                               throw new RuntimeException("Invalid key fields: 
" + Arrays.toString(keyNames));
+                       }
+               }
+               validateKeyTypes(keyFieldIndices);
+               this.keyFieldIndices = keyFieldIndices;
+       }
+       @Override
+       public void setIsAppendOnly(Boolean isAppendOnly) {
+               // we don't care
+       }
+       @Override
+       public TypeInformation<Row> getRecordType() {
+               return schema.toRowType();
+       }
+       @Override
+       public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) 
+               final ElasticsearchUpsertSinkFunction upsertFunction =
+                       new ElasticsearchUpsertSinkFunction(
+                               index,
+                               docType,
+                               keyDelimiter,
+                               keyNullLiteral,
+                               serializationSchema,
+                               contentType,
+                               updateRequestFactory,
+                               keyFieldIndices);
+               final SinkFunction<Tuple2<Boolean, Row>> sinkFunction = 
+                       hosts,
+                       failureHandler,
+                       sinkOptions,
+                       upsertFunction);
+               dataStream.addSink(sinkFunction)
.name(TableConnectorUtil.generateRuntimeName(this.getClass(), getFieldNames()));
+       }
+       @Override
+       public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+               return Types.TUPLE(Types.BOOLEAN, getRecordType());
+       }
+       @Override
+       public String[] getFieldNames() {
+               return schema.getColumnNames();
+       }
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return schema.getTypes();
+       }
+       @Override
+       public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               if (!Arrays.equals(getFieldNames(), fieldNames) || 
!Arrays.equals(getFieldTypes(), fieldTypes)) {
+                       throw new ValidationException("Reconfiguration with 
different fields is not allowed. " +
+                               "Expected: " + Arrays.toString(getFieldNames()) 
+ " / " + Arrays.toString(getFieldTypes()) + ". " +
+                               "But was: " + Arrays.toString(fieldNames) + " / 
" + Arrays.toString(fieldTypes));
+               }
+               return copy(
+                       schema,
+                       hosts,
+                       index,
+                       docType,
+                       keyDelimiter,
+                       keyNullLiteral,
+                       serializationSchema,
+                       contentType,
+                       failureHandler,
+                       sinkOptions,
+                       updateRequestFactory);
+       }
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               ElasticsearchUpsertTableSinkBase that = 
(ElasticsearchUpsertTableSinkBase) o;
+               return Objects.equals(schema, that.schema) &&
+                       Objects.equals(hosts, that.hosts) &&
+                       Objects.equals(index, that.index) &&
+                       Objects.equals(docType, that.docType) &&
+                       Objects.equals(keyDelimiter, that.keyDelimiter) &&
+                       Objects.equals(keyNullLiteral, that.keyNullLiteral) &&
+                       Objects.equals(serializationSchema, 
that.serializationSchema) &&
+                       Objects.equals(contentType, that.contentType) &&
+                       Objects.equals(failureHandler, that.failureHandler) &&
+                       Objects.equals(sinkOptions, that.sinkOptions);
+       }
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       schema,
+                       hosts,
+                       index,
+                       docType,
+                       keyDelimiter,
+                       keyNullLiteral,
+                       serializationSchema,
+                       contentType,
+                       failureHandler,
+                       sinkOptions);
+       }
+       // 
+       // For version-specific implementations
+       // 
+       protected abstract ElasticsearchUpsertTableSinkBase copy(
+               TableSchema schema,
+               List<Host> hosts,
+               String index,
+               String docType,
+               String keyDelimiter,
+               String keyNullLiteral,
+               SerializationSchema<Row> serializationSchema,
+               XContentType contentType,
+               ActionRequestFailureHandler failureHandler,
+               Map<SinkOption, String> sinkOptions,
+               UpdateRequestFactory updateRequestFactory);
+       protected abstract SinkFunction<Tuple2<Boolean, Row>> 
+               List<Host> hosts,
+               ActionRequestFailureHandler failureHandler,
+               Map<SinkOption, String> sinkOptions,
+               ElasticsearchUpsertSinkFunction upsertFunction);
+       // 
+       // Helper methods
+       // 
+       /**
+        * Validate the types that are used for conversion to string.
+        */
+       private void validateKeyTypes(int[] keyFieldIndices) {
+               final TypeInformation<?>[] types = getFieldTypes();
+               for (int keyFieldIndex : keyFieldIndices) {
+                       final TypeInformation<?> type = types[keyFieldIndex];
+                       if (!TypeCheckUtils.isSimpleStringRepresentation(type)) 
+                               throw new ValidationException(
+                                       "Only simple types that can be safely 
converted into a string representation " +
+                                               "can be used as keys. But was: 
" + type);
+                       }
+               }
+       }
+       // 
+       // Helper classes
+       // 
+       /**
+        * Keys for optional parameterization of the sink.
+        */
+       public enum SinkOption {
+               BULK_FLUSH_MAX_ACTIONS,
+               BULK_FLUSH_MAX_SIZE,
+               BULK_FLUSH_INTERVAL,
+               BULK_FLUSH_BACKOFF_TYPE,
+               REST_MAX_RETRY_TIMEOUT,
+               REST_PATH_PREFIX
+       }
+       /**
+        * Entity for describing a host of Elasticsearch.
+        */
+       public static class Host {
+               public final String hostname;
+               public final int port;
+               public final String schema;
+               public Host(String hostname, int port, String schema) {
+                       this.hostname = hostname;
+                       this.port = port;
+                       this.schema = schema;
+               }
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       Host host = (Host) o;
+                       return port == host.port &&
+                               Objects.equals(hostname, host.hostname) &&
+                               Objects.equals(schema, host.schema);
+               }
+               @Override
+               public int hashCode() {
+                       return Objects.hash(
+                               hostname,
+                               port,
+                               schema);
+               }
+       }
+       /**
+        * For version-agnostic creating of an {@link UpdateRequest}.
+        */
+       public interface UpdateRequestFactory extends Serializable {
+               /**
+                * Creates an update request to be added to an {@link 
+                */
+               UpdateRequest createUpdateRequest(
+                       String index,
+                       String docType,
+                       String key,
+                       XContentType contentType,
+                       byte[] document);
+       }
+       /**
+        * Sink function for converting upserts into Elasticsearch {@link 
+        */
+       public static class ElasticsearchUpsertSinkFunction implements 
ElasticsearchSinkFunction<Tuple2<Boolean, Row>> {
+               private final String index;
+               private final String docType;
+               private final String keyDelimiter;
+               private final String keyNullLiteral;
+               private final SerializationSchema<Row> serializationSchema;
+               private final XContentType contentType;
+               private final UpdateRequestFactory updateRequestFactory;
+               private final int[] keyFieldIndices;
+               public ElasticsearchUpsertSinkFunction(
+                               String index,
+                               String docType,
+                               String keyDelimiter,
+                               String keyNullLiteral,
+                               SerializationSchema<Row> serializationSchema,
+                               XContentType contentType,
+                               UpdateRequestFactory updateRequestFactory,
+                               int[] keyFieldIndices) {
+                       this.index = Preconditions.checkNotNull(index);
+                       this.docType = Preconditions.checkNotNull(docType);
+                       this.keyDelimiter = 
+                       this.serializationSchema = 
+                       this.contentType = 
+                       this.keyFieldIndices = 
+                       this.updateRequestFactory = 
+                       this.keyNullLiteral = 
+               }
+               @Override
+               public void process(Tuple2<Boolean, Row> element, 
RuntimeContext ctx, RequestIndexer indexer) {
+                       if (element.f0) {
+                               processUpsert(element.f1, indexer);
+                       } else {
+                               processDelete(element.f1, indexer);
+                       }
+               }
+               private void processUpsert(Row row, RequestIndexer indexer) {
+                       final String key = createKey(row);
+                       final byte[] document = 
+                       final UpdateRequest updateRequest = 
+                               index,
+                               docType,
+                               key,
+                               contentType,
+                               document);
+                       indexer.add(updateRequest);
+               }
+               private void processDelete(Row row, RequestIndexer indexer) {
+                       final String key = createKey(row);
+                       final DeleteRequest deleteRequest = new 
DeleteRequest(index, docType, key);
+                       indexer.add(deleteRequest);
+               }
+               private String createKey(Row row) {
 Review comment:
   Personally would prefer java 8's stream version of this function:
        private String createKey(Row row) {
                return Arrays.stream(keyFieldIndices).mapToObj(i -> {
                        final Object value = row.getField(i);
                        if (value == null) {
                                return keyNullLiteral;
                        } else {
                                return value.toString();
   It is more readable at least for me.

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

Reply via email to