Re: Elasticsearch connector support?

2023-11-27 Thread Lasse Nedergaard
Hi HangThanks for the link. I will wait for 3.1 connector release and hope it will be included. Med venlig hilsen / Best regardsLasse NedergaardDen 27. nov. 2023 kl. 12.00 skrev Hang Ruan :Hi, Lasse.There is already a discussion about the connector releases for 1.18[1].Best,Hang[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2Lasse Nedergaard  于2023年11月24日周五 22:57写道:Hi

From the documentation I can see there isn’t any ES support in Flink 1.18 right now and Flink-26088 (ES 8 support) is still open. 

Does anyone has an idea when ES connector support will be available in 1.18

Please let me know. 

Med venlig hilsen / Best regards
Lasse Nedergaard




Re: Elasticsearch connector support?

2023-11-27 Thread Hang Ruan
Hi, Lasse.

There is already a discussion about the connector releases for 1.18[1].

Best,
Hang

[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2

Lasse Nedergaard  于2023年11月24日周五 22:57写道:

> Hi
>
> From the documentation I can see there isn’t any ES support in Flink 1.18
> right now and Flink-26088 (ES 8 support) is still open.
>
> Does anyone has an idea when ES connector support will be available in 1.18
>
> Please let me know.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Elasticsearch connector support?

2023-11-24 Thread Lasse Nedergaard
Hi

From the documentation I can see there isn’t any ES support in Flink 1.18 right 
now and Flink-26088 (ES 8 support) is still open. 

Does anyone has an idea when ES connector support will be available in 1.18

Please let me know. 

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
Hi Madhu,

Not yet. The API has changed slightly. We'll add one very soon. In the
meantime I've created an issue to keep track of the status:

https://issues.apache.org/jira/browse/FLINK-3115

Thanks,
Max

On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
 wrote:
> is current elasticsearch-flink connector support elasticsearch 2.x version?
>
> -Madhu


Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota  wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink extends RichSinkFunction {
>
> public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
> public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
> public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
> private static final long serialVersionUID = 1L;
> private static final int DEFAULT_PORT = 9300;
> private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
> /**
>  * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>  */
> private final Map userConfig;
>
> /**
>  * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>  */
> private final IndexRequestBuilder indexRequestBuilder;
>
> /**
>  * The Client that was either retrieved from a Node or is a
> TransportClient.
>  */
> private transient Client client;
>
> /**
>  * Bulk processor that was created using the client
>  */
> private transient BulkProcessor bulkProcessor;
>
> /**
>  * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>  */
> private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
> /**
>  * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>  */
> private final AtomicReference failureThrowable = new
> AtomicReference();
>
> public ElasticsearchSink(Map userConfig,
> IndexRequestBuilder indexRequestBuilder) {
> this.userConfig = userConfig;
> this.indexRequestBuilder = indexRequestBuilder;
> }
>
>
> @Override
> public void open(Configuration configuration) {
>
> ParameterTool params = ParameterTool.fromMap(userConfig);
> Settings settings = Settings.settingsBuilder()
> .put(userConfig)
> .build();
>
> TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
> for (String server : params.get("esHost").split(";"))
> {
> String[] components = server.trim().split(":");
> String host = components[0];
> int port = DEFAULT_PORT;
> if (components.length > 1)
> {
> port = Integer.parseInt(components[1]);
> }
>
> try {
> transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
> } catch (UnknownHostException e) {
> e.printStackTrace();
> }
> }
>
> List nodes = transportClient.connectedNodes();
> if (nodes.isEmpty()) {
> throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.info("Connected to nodes: " + nodes.toString());
> }
> }
> client = transportClient;
>
> BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
> client,
> new BulkProcessor.Listener() {
>

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Madhukar Thota
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels  wrote:

> Hi Madhu,
>
> Great. Do you want to contribute it back via a GitHub pull request? If
> not that's also fine. We will try look into the 2.0 connector next
> week.
>
> Best,
> Max
>
> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
> wrote:
> > i have created working connector for Elasticsearch 2.0 based on
> > elasticsearch-flink connector. I am using it right now but i want
> official
> > connector from flink.
> >
> > ElasticsearchSink.java
> >
> >
> > import org.apache.flink.api.java.utils.ParameterTool;
> > import org.apache.flink.configuration.Configuration;
> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > import java.net.InetAddress;
> > import java.net.UnknownHostException;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.concurrent.atomic.AtomicBoolean;
> > import java.util.concurrent.atomic.AtomicReference;
> >
> > import org.elasticsearch.action.bulk.BulkItemResponse;
> > import org.elasticsearch.action.bulk.BulkProcessor;
> > import org.elasticsearch.action.bulk.BulkRequest;
> > import org.elasticsearch.action.bulk.BulkResponse;
> > import org.elasticsearch.action.index.IndexRequest;
> > import org.elasticsearch.client.Client;
> > import org.elasticsearch.client.transport.TransportClient;
> > import org.elasticsearch.cluster.node.DiscoveryNode;
> > import org.elasticsearch.common.settings.Settings;
> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
> > import org.elasticsearch.common.unit.ByteSizeUnit;
> > import org.elasticsearch.common.unit.ByteSizeValue;
> > import org.elasticsearch.common.unit.TimeValue;
> >
> >
> > public class ElasticsearchSink extends RichSinkFunction {
> >
> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> > "bulk.flush.max.actions";
> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> > "bulk.flush.max.size.mb";
> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> > "bulk.flush.interval.ms";
> >
> > private static final long serialVersionUID = 1L;
> > private static final int DEFAULT_PORT = 9300;
> > private static final Logger LOG =
> > LoggerFactory.getLogger(ElasticsearchSink.class);
> >
> > /**
> >  * The user specified config map that we forward to Elasticsearch
> when
> > we create the Client.
> >  */
> > private final Map userConfig;
> >
> > /**
> >  * The builder that is used to construct an {@link IndexRequest} from
> > the incoming element.
> >  */
> > private final IndexRequestBuilder indexRequestBuilder;
> >
> > /**
> >  * The Client that was either retrieved from a Node or is a
> > TransportClient.
> >  */
> > private transient Client client;
> >
> > /**
> >  * Bulk processor that was created using the client
> >  */
> > private transient BulkProcessor bulkProcessor;
> >
> > /**
> >  * This is set from inside the BulkProcessor listener if there where
> > failures in processing.
> >  */
> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
> >
> > /**
> >  * This is set from inside the BulkProcessor listener if a Throwable
> was
> > thrown during processing.
> >  */
> > private final AtomicReference failureThrowable = new
> > AtomicReference();
> >
> > public ElasticsearchSink(Map userConfig,
> > IndexRequestBuilder indexRequestBuilder) {
> > this.userConfig = userConfig;
> > this.indexRequestBuilder = indexRequestBuilder;
> > }
> >
> >
> > @Override
> > public void open(Configuration configuration) {
> >
> > ParameterTool params = ParameterTool.fromMap(userConfig);
> > Settings settings = Settings.settingsBuilder()
> > .put(userConfig)
> > .build();
> >
> > TransportClient transportClient =
> > TransportClient.builder().settings(settings).build();
> > for (String server : params.get("esHost").split(";"))
> > {
> > String[] components = server.trim().split(":");
> > String host = components[0];
> > int port = DEFAULT_PORT;
> > if (components.length > 1)
> > {
> > port = Integer.parseInt(components[1]);
> > }
> >
> > try {
> > transportClient = transportClient.addTransportAddress(new
> > InetSocketTransportAddress(InetAddress.getByName(host), port));
> > } catch (UnknownHostException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > List nodes = transportClient.connectedNodes();
> > if (nodes.isEmpty()) {
> > throw new RuntimeException("Client is not connected to any
> 

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Flavio Pompermaier
shouldn't be better to have both connectors for ES?one for 1.x and another
for 2.x?
On 4 Dec 2015 20:55, "Madhukar Thota"  wrote:

> Sure. I can submit the pull request.
>
> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels 
> wrote:
>
>> Hi Madhu,
>>
>> Great. Do you want to contribute it back via a GitHub pull request? If
>> not that's also fine. We will try look into the 2.0 connector next
>> week.
>>
>> Best,
>> Max
>>
>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
>> wrote:
>> > i have created working connector for Elasticsearch 2.0 based on
>> > elasticsearch-flink connector. I am using it right now but i want
>> official
>> > connector from flink.
>> >
>> > ElasticsearchSink.java
>> >
>> >
>> > import org.apache.flink.api.java.utils.ParameterTool;
>> > import org.apache.flink.configuration.Configuration;
>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > import java.net.InetAddress;
>> > import java.net.UnknownHostException;
>> > import java.util.List;
>> > import java.util.Map;
>> > import java.util.concurrent.atomic.AtomicBoolean;
>> > import java.util.concurrent.atomic.AtomicReference;
>> >
>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>> > import org.elasticsearch.action.bulk.BulkProcessor;
>> > import org.elasticsearch.action.bulk.BulkRequest;
>> > import org.elasticsearch.action.bulk.BulkResponse;
>> > import org.elasticsearch.action.index.IndexRequest;
>> > import org.elasticsearch.client.Client;
>> > import org.elasticsearch.client.transport.TransportClient;
>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>> > import org.elasticsearch.common.settings.Settings;
>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>> > import org.elasticsearch.common.unit.ByteSizeValue;
>> > import org.elasticsearch.common.unit.TimeValue;
>> >
>> >
>> > public class ElasticsearchSink extends RichSinkFunction {
>> >
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>> > "bulk.flush.max.actions";
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>> > "bulk.flush.max.size.mb";
>> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>> > "bulk.flush.interval.ms";
>> >
>> > private static final long serialVersionUID = 1L;
>> > private static final int DEFAULT_PORT = 9300;
>> > private static final Logger LOG =
>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>> >
>> > /**
>> >  * The user specified config map that we forward to Elasticsearch
>> when
>> > we create the Client.
>> >  */
>> > private final Map userConfig;
>> >
>> > /**
>> >  * The builder that is used to construct an {@link IndexRequest}
>> from
>> > the incoming element.
>> >  */
>> > private final IndexRequestBuilder indexRequestBuilder;
>> >
>> > /**
>> >  * The Client that was either retrieved from a Node or is a
>> > TransportClient.
>> >  */
>> > private transient Client client;
>> >
>> > /**
>> >  * Bulk processor that was created using the client
>> >  */
>> > private transient BulkProcessor bulkProcessor;
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if there where
>> > failures in processing.
>> >  */
>> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if a
>> Throwable was
>> > thrown during processing.
>> >  */
>> > private final AtomicReference failureThrowable = new
>> > AtomicReference();
>> >
>> > public ElasticsearchSink(Map userConfig,
>> > IndexRequestBuilder indexRequestBuilder) {
>> > this.userConfig = userConfig;
>> > this.indexRequestBuilder = indexRequestBuilder;
>> > }
>> >
>> >
>> > @Override
>> > public void open(Configuration configuration) {
>> >
>> > ParameterTool params = ParameterTool.fromMap(userConfig);
>> > Settings settings = Settings.settingsBuilder()
>> > .put(userConfig)
>> > .build();
>> >
>> > TransportClient transportClient =
>> > TransportClient.builder().settings(settings).build();
>> > for (String server : params.get("esHost").split(";"))
>> > {
>> > String[] components = server.trim().split(":");
>> > String host = components[0];
>> > int port = DEFAULT_PORT;
>> > if (components.length > 1)
>> > {
>> > port = Integer.parseInt(components[1]);
>> > }
>> >
>> > try {
>> > transportClient =
>> transportClient.addTransportAddress(new
>> > InetSocketTransportAddress(InetAddress.getByName(host), port));

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Deepak Sharma
Hi Madhu
Would you be able to provide the use case here in ElasticSearch with Flink?

Thanks
Deepak

On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota 
wrote:

> Sure. I can submit the pull request.
>
> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels 
> wrote:
>
>> Hi Madhu,
>>
>> Great. Do you want to contribute it back via a GitHub pull request? If
>> not that's also fine. We will try look into the 2.0 connector next
>> week.
>>
>> Best,
>> Max
>>
>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
>> wrote:
>> > i have created working connector for Elasticsearch 2.0 based on
>> > elasticsearch-flink connector. I am using it right now but i want
>> official
>> > connector from flink.
>> >
>> > ElasticsearchSink.java
>> >
>> >
>> > import org.apache.flink.api.java.utils.ParameterTool;
>> > import org.apache.flink.configuration.Configuration;
>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > import java.net.InetAddress;
>> > import java.net.UnknownHostException;
>> > import java.util.List;
>> > import java.util.Map;
>> > import java.util.concurrent.atomic.AtomicBoolean;
>> > import java.util.concurrent.atomic.AtomicReference;
>> >
>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>> > import org.elasticsearch.action.bulk.BulkProcessor;
>> > import org.elasticsearch.action.bulk.BulkRequest;
>> > import org.elasticsearch.action.bulk.BulkResponse;
>> > import org.elasticsearch.action.index.IndexRequest;
>> > import org.elasticsearch.client.Client;
>> > import org.elasticsearch.client.transport.TransportClient;
>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>> > import org.elasticsearch.common.settings.Settings;
>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>> > import org.elasticsearch.common.unit.ByteSizeValue;
>> > import org.elasticsearch.common.unit.TimeValue;
>> >
>> >
>> > public class ElasticsearchSink extends RichSinkFunction {
>> >
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>> > "bulk.flush.max.actions";
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>> > "bulk.flush.max.size.mb";
>> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>> > "bulk.flush.interval.ms";
>> >
>> > private static final long serialVersionUID = 1L;
>> > private static final int DEFAULT_PORT = 9300;
>> > private static final Logger LOG =
>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>> >
>> > /**
>> >  * The user specified config map that we forward to Elasticsearch
>> when
>> > we create the Client.
>> >  */
>> > private final Map userConfig;
>> >
>> > /**
>> >  * The builder that is used to construct an {@link IndexRequest}
>> from
>> > the incoming element.
>> >  */
>> > private final IndexRequestBuilder indexRequestBuilder;
>> >
>> > /**
>> >  * The Client that was either retrieved from a Node or is a
>> > TransportClient.
>> >  */
>> > private transient Client client;
>> >
>> > /**
>> >  * Bulk processor that was created using the client
>> >  */
>> > private transient BulkProcessor bulkProcessor;
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if there where
>> > failures in processing.
>> >  */
>> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if a
>> Throwable was
>> > thrown during processing.
>> >  */
>> > private final AtomicReference failureThrowable = new
>> > AtomicReference();
>> >
>> > public ElasticsearchSink(Map userConfig,
>> > IndexRequestBuilder indexRequestBuilder) {
>> > this.userConfig = userConfig;
>> > this.indexRequestBuilder = indexRequestBuilder;
>> > }
>> >
>> >
>> > @Override
>> > public void open(Configuration configuration) {
>> >
>> > ParameterTool params = ParameterTool.fromMap(userConfig);
>> > Settings settings = Settings.settingsBuilder()
>> > .put(userConfig)
>> > .build();
>> >
>> > TransportClient transportClient =
>> > TransportClient.builder().settings(settings).build();
>> > for (String server : params.get("esHost").split(";"))
>> > {
>> > String[] components = server.trim().split(":");
>> > String host = components[0];
>> > int port = DEFAULT_PORT;
>> > if (components.length > 1)
>> > {
>> > port = Integer.parseInt(components[1]);
>> > }
>> >
>> > try {
>> > transportClient =
>> transportClient.addTransportAddress(new
>> >