> shouldn't be better to have both connectors for ES?one for 1.x and another
> for 2.x?


IMHO that's the way to go.

Thanks Madhukar!

Cheers,
Max

On Sat, Dec 5, 2015 at 6:49 AM, Deepak Sharma <deepakmc...@gmail.com> wrote:

> Hi Madhu
> Would you be able to provide the use case here in ElasticSearch with Flink?
>
> Thanks
> Deepak
>
> On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota <madhukar.th...@gmail.com>
> wrote:
>
>> Sure. I can submit the pull request.
>>
>> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Madhu,
>>>
>>> Great. Do you want to contribute it back via a GitHub pull request? If
>>> not that's also fine. We will try look into the 2.0 connector next
>>> week.
>>>
>>> Best,
>>> Max
>>>
>>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com>
>>> wrote:
>>> > i have created working connector for Elasticsearch 2.0 based on
>>> > elasticsearch-flink connector. I am using it right now but i want
>>> official
>>> > connector from flink.
>>> >
>>> > ElasticsearchSink.java
>>> >
>>> >
>>> > import org.apache.flink.api.java.utils.ParameterTool;
>>> > import org.apache.flink.configuration.Configuration;
>>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>> > import org.slf4j.Logger;
>>> > import org.slf4j.LoggerFactory;
>>> >
>>> > import java.net.InetAddress;
>>> > import java.net.UnknownHostException;
>>> > import java.util.List;
>>> > import java.util.Map;
>>> > import java.util.concurrent.atomic.AtomicBoolean;
>>> > import java.util.concurrent.atomic.AtomicReference;
>>> >
>>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>>> > import org.elasticsearch.action.bulk.BulkProcessor;
>>> > import org.elasticsearch.action.bulk.BulkRequest;
>>> > import org.elasticsearch.action.bulk.BulkResponse;
>>> > import org.elasticsearch.action.index.IndexRequest;
>>> > import org.elasticsearch.client.Client;
>>> > import org.elasticsearch.client.transport.TransportClient;
>>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>>> > import org.elasticsearch.common.settings.Settings;
>>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>>> > import org.elasticsearch.common.unit.ByteSizeValue;
>>> > import org.elasticsearch.common.unit.TimeValue;
>>> >
>>> >
>>> > public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>>> >
>>> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>>> > "bulk.flush.max.actions";
>>> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>>> > "bulk.flush.max.size.mb";
>>> >     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>>> > "bulk.flush.interval.ms";
>>> >
>>> >     private static final long serialVersionUID = 1L;
>>> >     private static final int DEFAULT_PORT = 9300;
>>> >     private static final Logger LOG =
>>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>>> >
>>> >     /**
>>> >      * The user specified config map that we forward to Elasticsearch
>>> when
>>> > we create the Client.
>>> >      */
>>> >     private final Map<String, String> userConfig;
>>> >
>>> >     /**
>>> >      * The builder that is used to construct an {@link IndexRequest}
>>> from
>>> > the incoming element.
>>> >      */
>>> >     private final IndexRequestBuilder<T> indexRequestBuilder;
>>> >
>>> >     /**
>>> >      * The Client that was either retrieved from a Node or is a
>>> > TransportClient.
>>> >      */
>>> >     private transient Client client;
>>> >
>>> >     /**
>>> >      * Bulk processor that was created using the client
>>> >      */
>>> >     private transient BulkProcessor bulkProcessor;
>>> >
>>> >     /**
>>> >      * This is set from inside the BulkProcessor listener if there
>>> where
>>> > failures in processing.
>>> >      */
>>> >     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>>> >
>>> >     /**
>>> >      * This is set from inside the BulkProcessor listener if a
>>> Throwable was
>>> > thrown during processing.
>>> >      */
>>> >     private final AtomicReference<Throwable> failureThrowable = new
>>> > AtomicReference<Throwable>();
>>> >
>>> >     public ElasticsearchSink(Map<String, String> userConfig,
>>> > IndexRequestBuilder<T> indexRequestBuilder) {
>>> >         this.userConfig = userConfig;
>>> >         this.indexRequestBuilder = indexRequestBuilder;
>>> >     }
>>> >
>>> >
>>> >     @Override
>>> >     public void open(Configuration configuration) {
>>> >
>>> >         ParameterTool params = ParameterTool.fromMap(userConfig);
>>> >         Settings settings = Settings.settingsBuilder()
>>> >                 .put(userConfig)
>>> >                 .build();
>>> >
>>> >         TransportClient transportClient =
>>> > TransportClient.builder().settings(settings).build();
>>> >         for (String server : params.get("esHost").split(";"))
>>> >         {
>>> >             String[] components = server.trim().split(":");
>>> >             String host = components[0];
>>> >             int port = DEFAULT_PORT;
>>> >             if (components.length > 1)
>>> >             {
>>> >                 port = Integer.parseInt(components[1]);
>>> >             }
>>> >
>>> >             try {
>>> >                 transportClient =
>>> transportClient.addTransportAddress(new
>>> > InetSocketTransportAddress(InetAddress.getByName(host), port));
>>> >             } catch (UnknownHostException e) {
>>> >                 e.printStackTrace();
>>> >             }
>>> >         }
>>> >
>>> >         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>>> >         if (nodes.isEmpty()) {
>>> >             throw new RuntimeException("Client is not connected to any
>>> > Elasticsearch nodes!");
>>> >         } else {
>>> >             if (LOG.isDebugEnabled()) {
>>> >                 LOG.info("Connected to nodes: " + nodes.toString());
>>> >             }
>>> >         }
>>> >         client = transportClient;
>>> >
>>> >         BulkProcessor.Builder bulkProcessorBuilder =
>>> BulkProcessor.builder(
>>> >                 client,
>>> >                 new BulkProcessor.Listener() {
>>> >                     public void beforeBulk(long executionId,
>>> >                                            BulkRequest request) {
>>> >
>>> >                     }
>>> >
>>> >                     public void afterBulk(long executionId,
>>> >                                           BulkRequest request,
>>> >                                           BulkResponse response) {
>>> >                         if (response.hasFailures()) {
>>> >                             for (BulkItemResponse itemResp :
>>> > response.getItems()) {
>>> >                                 if (itemResp.isFailed()) {
>>> >                                     LOG.error("Failed to index
>>> document in
>>> > Elasticsearch: " + itemResp.getFailureMessage());
>>> >
>>>  failureThrowable.compareAndSet(null, new
>>> > RuntimeException(itemResp.getFailureMessage()));
>>> >                                 }
>>> >                             }
>>> >                             hasFailure.set(true);
>>> >                         }
>>> >                     }
>>> >
>>> >                     public void afterBulk(long executionId,
>>> >                                           BulkRequest request,
>>> >                                           Throwable failure) {
>>> >                         LOG.error(failure.getMessage());
>>> >                         failureThrowable.compareAndSet(null, failure);
>>> >                         hasFailure.set(true);
>>> >                     }
>>> >                 });
>>> >
>>> >         // This makes flush() blocking
>>> >         bulkProcessorBuilder.setConcurrentRequests(0);
>>> >
>>> >
>>> >
>>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>>> >
>>> >
>>> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>>> >         }
>>> >
>>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>>> >             bulkProcessorBuilder.setBulkSize(new
>>> > ByteSizeValue(params.getInt(
>>> >                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB),
>>> ByteSizeUnit.MB));
>>> >         }
>>> >
>>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>>> >
>>> >
>>> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>>> >         }
>>> >
>>> >         bulkProcessor = bulkProcessorBuilder.build();
>>> >     }
>>> >
>>> >
>>> >     @Override
>>> >     public void invoke(T element) {
>>> >         IndexRequest indexRequest =
>>> > indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>>> >
>>> >         if (LOG.isDebugEnabled()) {
>>> >             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>>> >         }
>>> >
>>> >         bulkProcessor.add(indexRequest);
>>> >     }
>>> >
>>> >     @Override
>>> >     public void close() {
>>> >         if (bulkProcessor != null) {
>>> >             bulkProcessor.close();
>>> >             bulkProcessor = null;
>>> >         }
>>> >
>>> >         if (client != null) {
>>> >             client.close();
>>> >         }
>>> >
>>> >         if (hasFailure.get()) {
>>> >             Throwable cause = failureThrowable.get();
>>> >             if (cause != null) {
>>> >                 throw new RuntimeException("An error occured in
>>> > ElasticsearchSink.", cause);
>>> >             } else {
>>> >                 throw new RuntimeException("An error occured in
>>> > ElasticsearchSink.");
>>> >
>>> >             }
>>> >         }
>>> >     }
>>> >
>>> > }
>>> >
>>> >
>>> > In my Main Class:
>>> >
>>> >
>>> > Map<String, String> config = Maps.newHashMap();
>>> >
>>> > //Elasticsearch Parameters
>>> >
>>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
>>> > parameter.get("elasticsearch.bulk.flush.max.actions","1"));
>>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
>>> > parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
>>> > config.put("cluster.name", parameter.get("elasticsearch.cluster.name
>>> "));
>>> > config.put("esHost", parameter.get("elasticsearch.server",
>>> > "localhost:9300"));
>>> >
>>> >
>>> > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
>>> > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
>>> > runtimeContext) -> {
>>> >     String[] line = element.toLowerCase().split("
>>> > +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>>> >     String measureAndTags = line[0];
>>> >     String[] kvSplit = line[1].split("=");
>>> >     String fieldName = kvSplit[0];
>>> >     String fieldValue = kvSplit[1];
>>> >     Map<String, String> tags = new HashMap<>();
>>> >     String measure = parseMeasureAndTags(measureAndTags, tags);
>>> >     long time = (long) (Double.valueOf(line[2]) / 1000000);
>>> >
>>> >     Map<String, Object> test = new HashMap<>();
>>> >     DateFormat dateFormat = new
>>> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>> >     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>>> >
>>> >     test.put(fieldName, setValue(fieldValue));
>>> >     test.put("tags", tags);
>>> >     test.put("measurement", measure);
>>> >     test.put("@timestamp", dateFormat.format(new Date(time)));
>>> >
>>> >     return Requests.indexRequest()
>>> >             .index("metrics")
>>> >             .type("test")
>>> >             .source(new Gson().toJson(test).toLowerCase());
>>> >
>>> >
>>> > }));
>>> >
>>> >
>>> > -Madhu
>>> >
>>> >
>>> > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org>
>>> wrote:
>>> >>
>>> >> Hi Madhu,
>>> >>
>>> >> Not yet. The API has changed slightly. We'll add one very soon. In the
>>> >> meantime I've created an issue to keep track of the status:
>>> >>
>>> >> https://issues.apache.org/jira/browse/FLINK-3115
>>> >>
>>> >> Thanks,
>>> >> Max
>>> >>
>>> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>>> >> <madhukar.th...@gmail.com> wrote:
>>> >> > is current elasticsearch-flink connector support elasticsearch 2.x
>>> >> > version?
>>> >> >
>>> >> > -Madhu
>>> >
>>> >
>>>
>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>

Reply via email to