[ https://issues.apache.org/jira/browse/FLINK-2558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179686#comment-15179686 ]
ASF GitHub Bot commented on FLINK-2558: --------------------------------------- Github user HungUnicorn commented on a diff in the pull request: https://github.com/apache/flink/pull/1040#discussion_r55013666 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java --- @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + + +/** + * Sink that emits its input elements to an Elasticsearch cluster. + * + * <p> + * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} + * the sink will create a local {@link Node} for communicating with the + * Elasticsearch cluster. When using the second constructor + * {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} a {@link TransportClient} will + * be used instead. + * + * <p> + * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster + * to come online. + * + * <p> + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * <p> + * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * + * <p> + * You also have to provide an {@link IndexRequestBuilder}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example. + * + * @param <T> Type of the elements emitted by this sink + */ +public class ElasticsearchSink<T> extends RichSinkFunction<T> { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + /** + * The user specified config map that we forward to Elasticsearch when we create the Client. + */ + private final Map<String, String> userConfig; + + /** + * The list of nodes that the TransportClient should connect to. This is null if we are using + * an embedded Node to get a Client. + */ + private final List<TransportAddress> transportNodes; --- End diff -- can I ask would it need to be `transient` rather than `final`? I encounter the issue that it's not serializable when working on `elasticsearch2 connector`. However, making it `transient` in open() the transportNodes is always `null` though not totally sure `transient` is the reason. > Add Streaming Connector for Elasticsearch > ----------------------------------------- > > Key: FLINK-2558 > URL: https://issues.apache.org/jira/browse/FLINK-2558 > Project: Flink > Issue Type: Improvement > Components: Streaming > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 0.10.0 > > > We should add a sink that can write to Elasticsearch. A source does not seem > necessary because Elasticsearch would mostly be used for accessing results, > for example using a dashboard. -- This message was sent by Atlassian JIRA (v6.3.4#6332)