Sure. I can submit the pull request. On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <m...@apache.org> wrote:
> Hi Madhu, > > Great. Do you want to contribute it back via a GitHub pull request? If > not that's also fine. We will try look into the 2.0 connector next > week. > > Best, > Max > > On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com> > wrote: > > i have created working connector for Elasticsearch 2.0 based on > > elasticsearch-flink connector. I am using it right now but i want > official > > connector from flink. > > > > ElasticsearchSink.java > > > > > > import org.apache.flink.api.java.utils.ParameterTool; > > import org.apache.flink.configuration.Configuration; > > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import java.net.InetAddress; > > import java.net.UnknownHostException; > > import java.util.List; > > import java.util.Map; > > import java.util.concurrent.atomic.AtomicBoolean; > > import java.util.concurrent.atomic.AtomicReference; > > > > import org.elasticsearch.action.bulk.BulkItemResponse; > > import org.elasticsearch.action.bulk.BulkProcessor; > > import org.elasticsearch.action.bulk.BulkRequest; > > import org.elasticsearch.action.bulk.BulkResponse; > > import org.elasticsearch.action.index.IndexRequest; > > import org.elasticsearch.client.Client; > > import org.elasticsearch.client.transport.TransportClient; > > import org.elasticsearch.cluster.node.DiscoveryNode; > > import org.elasticsearch.common.settings.Settings; > > import org.elasticsearch.common.transport.InetSocketTransportAddress; > > import org.elasticsearch.common.unit.ByteSizeUnit; > > import org.elasticsearch.common.unit.ByteSizeValue; > > import org.elasticsearch.common.unit.TimeValue; > > > > > > public class ElasticsearchSink<T> extends RichSinkFunction<T> { > > > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = > > "bulk.flush.max.actions"; > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = > > "bulk.flush.max.size.mb"; > > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = > > "bulk.flush.interval.ms"; > > > > private static final long serialVersionUID = 1L; > > private static final int DEFAULT_PORT = 9300; > > private static final Logger LOG = > > LoggerFactory.getLogger(ElasticsearchSink.class); > > > > /** > > * The user specified config map that we forward to Elasticsearch > when > > we create the Client. > > */ > > private final Map<String, String> userConfig; > > > > /** > > * The builder that is used to construct an {@link IndexRequest} from > > the incoming element. > > */ > > private final IndexRequestBuilder<T> indexRequestBuilder; > > > > /** > > * The Client that was either retrieved from a Node or is a > > TransportClient. > > */ > > private transient Client client; > > > > /** > > * Bulk processor that was created using the client > > */ > > private transient BulkProcessor bulkProcessor; > > > > /** > > * This is set from inside the BulkProcessor listener if there where > > failures in processing. > > */ > > private final AtomicBoolean hasFailure = new AtomicBoolean(false); > > > > /** > > * This is set from inside the BulkProcessor listener if a Throwable > was > > thrown during processing. > > */ > > private final AtomicReference<Throwable> failureThrowable = new > > AtomicReference<Throwable>(); > > > > public ElasticsearchSink(Map<String, String> userConfig, > > IndexRequestBuilder<T> indexRequestBuilder) { > > this.userConfig = userConfig; > > this.indexRequestBuilder = indexRequestBuilder; > > } > > > > > > @Override > > public void open(Configuration configuration) { > > > > ParameterTool params = ParameterTool.fromMap(userConfig); > > Settings settings = Settings.settingsBuilder() > > .put(userConfig) > > .build(); > > > > TransportClient transportClient = > > TransportClient.builder().settings(settings).build(); > > for (String server : params.get("esHost").split(";")) > > { > > String[] components = server.trim().split(":"); > > String host = components[0]; > > int port = DEFAULT_PORT; > > if (components.length > 1) > > { > > port = Integer.parseInt(components[1]); > > } > > > > try { > > transportClient = transportClient.addTransportAddress(new > > InetSocketTransportAddress(InetAddress.getByName(host), port)); > > } catch (UnknownHostException e) { > > e.printStackTrace(); > > } > > } > > > > List<DiscoveryNode> nodes = transportClient.connectedNodes(); > > if (nodes.isEmpty()) { > > throw new RuntimeException("Client is not connected to any > > Elasticsearch nodes!"); > > } else { > > if (LOG.isDebugEnabled()) { > > LOG.info("Connected to nodes: " + nodes.toString()); > > } > > } > > client = transportClient; > > > > BulkProcessor.Builder bulkProcessorBuilder = > BulkProcessor.builder( > > client, > > new BulkProcessor.Listener() { > > public void beforeBulk(long executionId, > > BulkRequest request) { > > > > } > > > > public void afterBulk(long executionId, > > BulkRequest request, > > BulkResponse response) { > > if (response.hasFailures()) { > > for (BulkItemResponse itemResp : > > response.getItems()) { > > if (itemResp.isFailed()) { > > LOG.error("Failed to index document > in > > Elasticsearch: " + itemResp.getFailureMessage()); > > failureThrowable.compareAndSet(null, > new > > RuntimeException(itemResp.getFailureMessage())); > > } > > } > > hasFailure.set(true); > > } > > } > > > > public void afterBulk(long executionId, > > BulkRequest request, > > Throwable failure) { > > LOG.error(failure.getMessage()); > > failureThrowable.compareAndSet(null, failure); > > hasFailure.set(true); > > } > > }); > > > > // This makes flush() blocking > > bulkProcessorBuilder.setConcurrentRequests(0); > > > > > > > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { > > > > > bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); > > } > > > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { > > bulkProcessorBuilder.setBulkSize(new > > ByteSizeValue(params.getInt( > > CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), > ByteSizeUnit.MB)); > > } > > > > if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { > > > > > bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); > > } > > > > bulkProcessor = bulkProcessorBuilder.build(); > > } > > > > > > @Override > > public void invoke(T element) { > > IndexRequest indexRequest = > > indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); > > > > if (LOG.isDebugEnabled()) { > > LOG.debug("Emitting IndexRequest: {}", indexRequest); > > } > > > > bulkProcessor.add(indexRequest); > > } > > > > @Override > > public void close() { > > if (bulkProcessor != null) { > > bulkProcessor.close(); > > bulkProcessor = null; > > } > > > > if (client != null) { > > client.close(); > > } > > > > if (hasFailure.get()) { > > Throwable cause = failureThrowable.get(); > > if (cause != null) { > > throw new RuntimeException("An error occured in > > ElasticsearchSink.", cause); > > } else { > > throw new RuntimeException("An error occured in > > ElasticsearchSink."); > > > > } > > } > > } > > > > } > > > > > > In my Main Class: > > > > > > Map<String, String> config = Maps.newHashMap(); > > > > //Elasticsearch Parameters > > > > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, > > parameter.get("elasticsearch.bulk.flush.max.actions","1")); > > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, > > parameter.get("elasticsearch.bulk.flush.interval.ms","2")); > > config.put("cluster.name", parameter.get("elasticsearch.cluster.name")); > > config.put("esHost", parameter.get("elasticsearch.server", > > "localhost:9300")); > > > > > > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new > > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, > > runtimeContext) -> { > > String[] line = element.toLowerCase().split(" > > +(?=(?:([^\"]*\"){2})*[^\"]*$)"); > > String measureAndTags = line[0]; > > String[] kvSplit = line[1].split("="); > > String fieldName = kvSplit[0]; > > String fieldValue = kvSplit[1]; > > Map<String, String> tags = new HashMap<>(); > > String measure = parseMeasureAndTags(measureAndTags, tags); > > long time = (long) (Double.valueOf(line[2]) / 1000000); > > > > Map<String, Object> test = new HashMap<>(); > > DateFormat dateFormat = new > > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); > > dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); > > > > test.put(fieldName, setValue(fieldValue)); > > test.put("tags", tags); > > test.put("measurement", measure); > > test.put("@timestamp", dateFormat.format(new Date(time))); > > > > return Requests.indexRequest() > > .index("metrics") > > .type("test") > > .source(new Gson().toJson(test).toLowerCase()); > > > > > > })); > > > > > > -Madhu > > > > > > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> > wrote: > >> > >> Hi Madhu, > >> > >> Not yet. The API has changed slightly. We'll add one very soon. In the > >> meantime I've created an issue to keep track of the status: > >> > >> https://issues.apache.org/jira/browse/FLINK-3115 > >> > >> Thanks, > >> Max > >> > >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota > >> <madhukar.th...@gmail.com> wrote: > >> > is current elasticsearch-flink connector support elasticsearch 2.x > >> > version? > >> > > >> > -Madhu > > > > >