Re: Flink Storm

2015-12-04 Thread Maximilian Michels
Hi Naveen,

Were you using Maven before? The syncing of changes in the master
always takes a while for Maven. The documentation happened to be
updated before Maven synchronized. Building and installing manually
(what you did) solves the problem.

Strangely, when I run your code on my machine with the latest
1.0-SNAPSHOT I see a lot of output on my console.

Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89

Could you add bolt which writes the Storm tuples to a file? Is that
file also empty?

builder.setBolt("file", new BoltFileSink("/tmp/storm", new OutputFormatter() {
   @Override
   public String format(Tuple tuple) {
  return tuple.toString();
   }
}), 1).shuffleGrouping("count");


Thanks,
Max


Re: Documentation for Fold

2015-12-04 Thread Maximilian Michels
Thanks Welly!

We have already corrected that in the snapshot documentation at
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations

I fixed it also for the 0.10 documentation.

Best,
Max

On Fri, Dec 4, 2015 at 6:24 AM, Welly Tambunan  wrote:
>
> Hi All,
>
> Currently i'm going through the documentation for DataStream here and minor 
> error in the docs. I thought i should inform you.
>
> I think fold only works for keyed data stream.
>
>
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Flink Storm

2015-12-04 Thread Madhire, Naveen
Hi Max,

Yeah, I did route the ³count² bolt output to a file and I see the output.
I can see the Storm and Flink output matching.

However, I am not able to use the BoltFileSink class in the 1.0-SNAPSHOT
which I built. I think it¹s better to wait for a day for the Maven sync to
happen so that I can directly use 1.0-SNAPSHOT in the dependency.

I have few Storm topologies, which I will try to run on Flink over the
next few days. I will let you know how that goes. Thanks :)


Thanks,
Naveen

On 12/4/15, 5:36 AM, "Maximilian Michels"  wrote:

>Hi Naveen,
>
>Were you using Maven before? The syncing of changes in the master
>always takes a while for Maven. The documentation happened to be
>updated before Maven synchronized. Building and installing manually
>(what you did) solves the problem.
>
>Strangely, when I run your code on my machine with the latest
>1.0-SNAPSHOT I see a lot of output on my console.
>
>Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>
>Could you add bolt which writes the Storm tuples to a file? Is that
>file also empty?
>
>builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>OutputFormatter() {
>   @Override
>   public String format(Tuple tuple) {
>  return tuple.toString();
>   }
>}), 1).shuffleGrouping("count");
>
>
>Thanks,
>Max



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



Re: Material on Apache flink internals

2015-12-04 Thread Welly Tambunan
Hi Madhu,

You can also check this page for the details on internals

https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
http://www.slideshare.net/KostasTzoumas/flink-internals

Cheers

On Fri, Dec 4, 2015 at 10:14 AM, madhu phatak  wrote:

> Hi,
> Thanks a lot for the resources.
> On Dec 1, 2015 9:11 PM, "Fabian Hueske"  wrote:
>
>> Hi Madhu,
>>
>> checkout the following resources:
>>
>> - Apache Flink Blog: http://flink.apache.org/blog/index.html
>> - Data Artisans Blog: http://data-artisans.com/blog/
>> - Flink Forward Conference website (Talk slides & recordings):
>> http://flink-forward.org/?post_type=session
>> - Flink Meetup talk recordings:
>> https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA
>> - Slim's Flink Knowledge base:
>> http://sparkbigdata.com/component/tags/tag/27-flink
>>
>> Best, Fabian
>>
>> 2015-12-01 16:23 GMT+01:00 madhu phatak :
>>
>>> Hi everyone,
>>>
>>> I am fascinated with flink core engine way of streaming of operators
>>> rather than typical map/reduce way that followed by hadoop or spark. Is any
>>> good documentation/blog/video avalable which talks about this internal. I
>>> am ok from a batch or streaming point of view.
>>>
>>> It will be great if some one can share this info. Thank you for your
>>> excellent work.
>>>
>>> --
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>
>>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
Hi Madhu,

Not yet. The API has changed slightly. We'll add one very soon. In the
meantime I've created an issue to keep track of the status:

https://issues.apache.org/jira/browse/FLINK-3115

Thanks,
Max

On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
 wrote:
> is current elasticsearch-flink connector support elasticsearch 2.x version?
>
> -Madhu


Re: Using Hadoop Input/Output formats

2015-12-04 Thread Nick Dimiduk
Thanks for the comments everyone. For my part, i'm interested most in using
Hadoop's OutputFormats for writing out data at the end of a streaming job.

I also agree that while these "convenience methods" make for good example
code in slide decks, they're often not helpful for "real" applications. The
additional maintenance burden of a bloated API tends to be
counter-productive.

-n

On Wed, Nov 25, 2015 at 8:41 AM, Robert Metzger  wrote:

> I agree with Stephan.
>
> Reading static files is quite uncommon with the DataStream API. Before We
> add such a method, we should add a convenience method for Kafka ;)
> But in general, I'm not a big fan of adding too many of these methods
> because they pull in so many external classes, which lead to breaking API
> changes, dependency issues etc.
>
> I think such issues can be addressed easily with a good documentation
> (maybe in the "Best practices" guide), good answers on Stack Overflow and
> so on.
>
> On Wed, Nov 25, 2015 at 12:12 PM, Stephan Ewen  wrote:
>
>> For streaming, I am a bit torn whether reading a file will should have so
>> many such prominent functions. Most streaming programs work on message
>> queues, or on monitored directories.
>>
>> Not saying no, but not sure DataSet/DataStream parity is the main goal -
>> they are for different use cases after all...
>>
>> On Wed, Nov 25, 2015 at 8:22 AM, Chiwan Park 
>> wrote:
>>
>>> Thanks for correction @Fabian. :)
>>>
>>> > On Nov 25, 2015, at 4:40 AM, Suneel Marthi  wrote:
>>> >
>>> > Guess, it makes sense to add readHadoopXXX() methods to
>>> StreamExecutionEnvironment (for feature parity with what's existing
>>> presently in ExecutionEnvironment).
>>> >
>>> > Also Flink-2949 addresses the need to add relevant syntactic sugar
>>> wrappers in DataSet api for the code snippet in Fabian's previous email.
>>> Its not cool, having to instantiate a JobConf in client code and having to
>>> pass that around.
>>> >
>>> >
>>> >
>>> > On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske 
>>> wrote:
>>> > Hi Nick,
>>> >
>>> > you can use Flink's HadoopInputFormat wrappers also for the DataStream
>>> API. However, DataStream does not offer as much "sugar" as DataSet because
>>> StreamEnvironment does not offer dedicated createHadoopInput or
>>> readHadoopFile methods.
>>> >
>>> > In DataStream Scala you can read from a Hadoop InputFormat
>>> (TextInputFormat in this case) as follows:
>>> >
>>> > val textData: DataStream[(LongWritable, Text)] = env.createInput(
>>> >   new HadoopInputFormat[LongWritable, Text](
>>> > new TextInputFormat,
>>> > classOf[LongWritable],
>>> > classOf[Text],
>>> > new JobConf()
>>> > ))
>>> >
>>> > The Java version is very similar.
>>> >
>>> > Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
>>> >
>>> > Cheers,
>>> > Fabian
>>> >
>>> > 2015-11-24 19:36 GMT+01:00 Chiwan Park :
>>> > I’m not streaming expert. AFAIK, the layer can be used with only
>>> DataSet. There are some streaming-specific features such as distributed
>>> snapshot in Flink. These need some supports of source and sink. So you have
>>> to implement I/O.
>>> >
>>> > > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk 
>>> wrote:
>>> > >
>>> > > I completely missed this, thanks Chiwan. Can these be used with
>>> DataStreams as well as DataSets?
>>> > >
>>> > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
>>> wrote:
>>> > > Hi Nick,
>>> > >
>>> > > You can use Hadoop Input/Output Format without modification! Please
>>> check the documentation[1] in Flink homepage.
>>> > >
>>> > > [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
>>> > >
>>> > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk 
>>> wrote:
>>> > > >
>>> > > > Hello,
>>> > > >
>>> > > > Is it possible to use existing Hadoop Input and OutputFormats with
>>> Flink? There's a lot of existing code that conforms to these interfaces,
>>> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
>>> > > >
>>> > > > Thanks,
>>> > > > Nick
>>> > >
>>> > > Regards,
>>> > > Chiwan Park
>>> > >
>>> > >
>>> >
>>> > Regards,
>>> > Chiwan Park
>>> >
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>>
>>>
>>>
>>
>


Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Maximilian Michels
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota  wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink extends RichSinkFunction {
>
> public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
> public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
> public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
> private static final long serialVersionUID = 1L;
> private static final int DEFAULT_PORT = 9300;
> private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
> /**
>  * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>  */
> private final Map userConfig;
>
> /**
>  * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>  */
> private final IndexRequestBuilder indexRequestBuilder;
>
> /**
>  * The Client that was either retrieved from a Node or is a
> TransportClient.
>  */
> private transient Client client;
>
> /**
>  * Bulk processor that was created using the client
>  */
> private transient BulkProcessor bulkProcessor;
>
> /**
>  * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>  */
> private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
> /**
>  * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>  */
> private final AtomicReference failureThrowable = new
> AtomicReference();
>
> public ElasticsearchSink(Map userConfig,
> IndexRequestBuilder indexRequestBuilder) {
> this.userConfig = userConfig;
> this.indexRequestBuilder = indexRequestBuilder;
> }
>
>
> @Override
> public void open(Configuration configuration) {
>
> ParameterTool params = ParameterTool.fromMap(userConfig);
> Settings settings = Settings.settingsBuilder()
> .put(userConfig)
> .build();
>
> TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
> for (String server : params.get("esHost").split(";"))
> {
> String[] components = server.trim().split(":");
> String host = components[0];
> int port = DEFAULT_PORT;
> if (components.length > 1)
> {
> port = Integer.parseInt(components[1]);
> }
>
> try {
> transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
> } catch (UnknownHostException e) {
> e.printStackTrace();
> }
> }
>
> List nodes = transportClient.connectedNodes();
> if (nodes.isEmpty()) {
> throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.info("Connected to nodes: " + nodes.toString());
> }
> }
> client = transportClient;
>
> BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
> client,
> new BulkProcessor.Listener() {
>

Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-12-04 Thread Robert Metzger
I think we need to find a solution for this problem soon.
Another user is most likely affected:
http://stackoverflow.com/q/34090808/568695

I've filed a JIRA for the problem:
https://issues.apache.org/jira/browse/FLINK-3121


On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek 
wrote:

> Maybe. In the Kafka case we just need to ensure that parallel instances of
> the source that know that they don’t have any partitions assigned to them
> emit Long.MAX_VALUE as a watermark.
>
> > On 30 Nov 2015, at 17:50, Gyula Fóra  wrote:
> >
> > Hi,
> >
> > I think what we will need at some point for this are approximate
> whatermarks which correlate event and ingest time.
> >
> > I think they have similar concepts in Millwheel/Dataflow.
> >
> > Cheers,
> > Gyula
> > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek 
> wrote:
> > Hi,
> > as an addition. I don’t have a solution yet, for the general problem of
> what happens when a parallel instance of a source never receives elements.
> This watermark business is very tricky...
> >
> > Cheers,
> > Aljoscha
> > > On 30 Nov 2015, at 17:20, Aljoscha Krettek 
> wrote:
> > >
> > > Hi Konstantin,
> > > I finally nailed down the problem. :-)
> > >
> > > The basis of the problem is the fact that there is a mismatch in the
> parallelism of the Flink Kafka Consumer and the number of partitions in the
> Kafka Stream. I would assume that in your case the Kafka Stream has 1
> partition. This means, that only one of the parallel instances of the Flink
> Kafka Consumer ever receives element, which in turn means that only one of
> the parallel instances of the timestamp extractor ever receives elements.
> This means that no watermarks get emitted for the other parallel instances
> which in turn means that the watermark does not advance downstream because
> the watermark at an operator is the minimum over all upstream watermarks.
> This explains why ExampleTimestampExtractor1 only works in the case with
> parallelism=1.
> > >
> > > The reason why ExampleTimestampExtractor2 works in all parallelism
> settings is not very obvious. The secret is in this method:
> > >
> > > @Override
> > > public long getCurrentWatermark() {
> > >   return lastTimestamp - maxDelay;
> > > }
> > >
> > > In the parallel instances that never receive any element lastTimestamp
> is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE
> - maxDelay (+1)). Now, because the watermark at an operator is always the
> minimum over all watermarks from upstream operators the watermark at the
> window operator always tracks the watermark of the parallel instance that
> receives elements.
> > >
> > > I hope this helps, but please let me know if I should provide more
> explanation. This is a very tricky topic.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >> On 29 Nov 2015, at 21:18, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> > >>
> > >> Hi Aljoscha,
> > >>
> > >> I have put together a gist [1] with two classes, a short processing
> > >> pipeline, which shows the behavior and a data generator to write
> records
> > >> into Kafka. I hope I remembered everything we discussed correctly.
> > >>
> > >> So basically in the example it works with "TimestampExtractor1" only
> for
> > >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> > >> parallelism. Run from the IDE.
> > >>
> > >> Let me know if you need anything else.
> > >>
> > >> Cheers,
> > >>
> > >> Konstantin
> > >>
> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> > >>
> > >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> > >>> Hi Aljoscha,
> > >>>
> > >>> sure, will do. I have neither found a solution. I won't have time to
> put
> > >>> a minimal example together before the weekend though.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Konstantin
> > >>>
> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> >  Hi Konstantin,
> >  I still didn’t come up with an explanation for the behavior. Could
> you maybe send me example code (and example data if it is necessary to
> reproduce the problem.)? This would really help me pinpoint the problem.
> > 
> >  Cheers,
> >  Aljoscha
> > > On 17 Nov 2015, at 21:42, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> > >
> > > Hi Aljoscha,
> > >
> > > Are you sure? I am running the job from my IDE at the moment.
> > >
> > > If I set
> > >
> > > StreamExecutionEnvironment.setParallelism(1);
> > >
> > > I works with the old TimestampExtractor (returning Long.MIN_VALUE
> from
> > > getCurrentWatermark() and emitting a watermark at every record)
> > >
> > > If I set
> > >
> > > StreamExecutionEnvironment.setParallelism(5);
> > >
> > > it does not work.
> > >
> > > So, if I understood you correctly, it is the opposite of what you
> were
> > > expecting?!
> > >
> > > Cheers,
> > 

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Madhukar Thota
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels  wrote:

> Hi Madhu,
>
> Great. Do you want to contribute it back via a GitHub pull request? If
> not that's also fine. We will try look into the 2.0 connector next
> week.
>
> Best,
> Max
>
> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
> wrote:
> > i have created working connector for Elasticsearch 2.0 based on
> > elasticsearch-flink connector. I am using it right now but i want
> official
> > connector from flink.
> >
> > ElasticsearchSink.java
> >
> >
> > import org.apache.flink.api.java.utils.ParameterTool;
> > import org.apache.flink.configuration.Configuration;
> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > import java.net.InetAddress;
> > import java.net.UnknownHostException;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.concurrent.atomic.AtomicBoolean;
> > import java.util.concurrent.atomic.AtomicReference;
> >
> > import org.elasticsearch.action.bulk.BulkItemResponse;
> > import org.elasticsearch.action.bulk.BulkProcessor;
> > import org.elasticsearch.action.bulk.BulkRequest;
> > import org.elasticsearch.action.bulk.BulkResponse;
> > import org.elasticsearch.action.index.IndexRequest;
> > import org.elasticsearch.client.Client;
> > import org.elasticsearch.client.transport.TransportClient;
> > import org.elasticsearch.cluster.node.DiscoveryNode;
> > import org.elasticsearch.common.settings.Settings;
> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
> > import org.elasticsearch.common.unit.ByteSizeUnit;
> > import org.elasticsearch.common.unit.ByteSizeValue;
> > import org.elasticsearch.common.unit.TimeValue;
> >
> >
> > public class ElasticsearchSink extends RichSinkFunction {
> >
> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> > "bulk.flush.max.actions";
> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> > "bulk.flush.max.size.mb";
> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> > "bulk.flush.interval.ms";
> >
> > private static final long serialVersionUID = 1L;
> > private static final int DEFAULT_PORT = 9300;
> > private static final Logger LOG =
> > LoggerFactory.getLogger(ElasticsearchSink.class);
> >
> > /**
> >  * The user specified config map that we forward to Elasticsearch
> when
> > we create the Client.
> >  */
> > private final Map userConfig;
> >
> > /**
> >  * The builder that is used to construct an {@link IndexRequest} from
> > the incoming element.
> >  */
> > private final IndexRequestBuilder indexRequestBuilder;
> >
> > /**
> >  * The Client that was either retrieved from a Node or is a
> > TransportClient.
> >  */
> > private transient Client client;
> >
> > /**
> >  * Bulk processor that was created using the client
> >  */
> > private transient BulkProcessor bulkProcessor;
> >
> > /**
> >  * This is set from inside the BulkProcessor listener if there where
> > failures in processing.
> >  */
> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
> >
> > /**
> >  * This is set from inside the BulkProcessor listener if a Throwable
> was
> > thrown during processing.
> >  */
> > private final AtomicReference failureThrowable = new
> > AtomicReference();
> >
> > public ElasticsearchSink(Map userConfig,
> > IndexRequestBuilder indexRequestBuilder) {
> > this.userConfig = userConfig;
> > this.indexRequestBuilder = indexRequestBuilder;
> > }
> >
> >
> > @Override
> > public void open(Configuration configuration) {
> >
> > ParameterTool params = ParameterTool.fromMap(userConfig);
> > Settings settings = Settings.settingsBuilder()
> > .put(userConfig)
> > .build();
> >
> > TransportClient transportClient =
> > TransportClient.builder().settings(settings).build();
> > for (String server : params.get("esHost").split(";"))
> > {
> > String[] components = server.trim().split(":");
> > String host = components[0];
> > int port = DEFAULT_PORT;
> > if (components.length > 1)
> > {
> > port = Integer.parseInt(components[1]);
> > }
> >
> > try {
> > transportClient = transportClient.addTransportAddress(new
> > InetSocketTransportAddress(InetAddress.getByName(host), port));
> > } catch (UnknownHostException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > List nodes = transportClient.connectedNodes();
> > if (nodes.isEmpty()) {
> > throw new RuntimeException("Client is not connected to any
> 

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Flavio Pompermaier
shouldn't be better to have both connectors for ES?one for 1.x and another
for 2.x?
On 4 Dec 2015 20:55, "Madhukar Thota"  wrote:

> Sure. I can submit the pull request.
>
> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels 
> wrote:
>
>> Hi Madhu,
>>
>> Great. Do you want to contribute it back via a GitHub pull request? If
>> not that's also fine. We will try look into the 2.0 connector next
>> week.
>>
>> Best,
>> Max
>>
>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
>> wrote:
>> > i have created working connector for Elasticsearch 2.0 based on
>> > elasticsearch-flink connector. I am using it right now but i want
>> official
>> > connector from flink.
>> >
>> > ElasticsearchSink.java
>> >
>> >
>> > import org.apache.flink.api.java.utils.ParameterTool;
>> > import org.apache.flink.configuration.Configuration;
>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > import java.net.InetAddress;
>> > import java.net.UnknownHostException;
>> > import java.util.List;
>> > import java.util.Map;
>> > import java.util.concurrent.atomic.AtomicBoolean;
>> > import java.util.concurrent.atomic.AtomicReference;
>> >
>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>> > import org.elasticsearch.action.bulk.BulkProcessor;
>> > import org.elasticsearch.action.bulk.BulkRequest;
>> > import org.elasticsearch.action.bulk.BulkResponse;
>> > import org.elasticsearch.action.index.IndexRequest;
>> > import org.elasticsearch.client.Client;
>> > import org.elasticsearch.client.transport.TransportClient;
>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>> > import org.elasticsearch.common.settings.Settings;
>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>> > import org.elasticsearch.common.unit.ByteSizeValue;
>> > import org.elasticsearch.common.unit.TimeValue;
>> >
>> >
>> > public class ElasticsearchSink extends RichSinkFunction {
>> >
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>> > "bulk.flush.max.actions";
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>> > "bulk.flush.max.size.mb";
>> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>> > "bulk.flush.interval.ms";
>> >
>> > private static final long serialVersionUID = 1L;
>> > private static final int DEFAULT_PORT = 9300;
>> > private static final Logger LOG =
>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>> >
>> > /**
>> >  * The user specified config map that we forward to Elasticsearch
>> when
>> > we create the Client.
>> >  */
>> > private final Map userConfig;
>> >
>> > /**
>> >  * The builder that is used to construct an {@link IndexRequest}
>> from
>> > the incoming element.
>> >  */
>> > private final IndexRequestBuilder indexRequestBuilder;
>> >
>> > /**
>> >  * The Client that was either retrieved from a Node or is a
>> > TransportClient.
>> >  */
>> > private transient Client client;
>> >
>> > /**
>> >  * Bulk processor that was created using the client
>> >  */
>> > private transient BulkProcessor bulkProcessor;
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if there where
>> > failures in processing.
>> >  */
>> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if a
>> Throwable was
>> > thrown during processing.
>> >  */
>> > private final AtomicReference failureThrowable = new
>> > AtomicReference();
>> >
>> > public ElasticsearchSink(Map userConfig,
>> > IndexRequestBuilder indexRequestBuilder) {
>> > this.userConfig = userConfig;
>> > this.indexRequestBuilder = indexRequestBuilder;
>> > }
>> >
>> >
>> > @Override
>> > public void open(Configuration configuration) {
>> >
>> > ParameterTool params = ParameterTool.fromMap(userConfig);
>> > Settings settings = Settings.settingsBuilder()
>> > .put(userConfig)
>> > .build();
>> >
>> > TransportClient transportClient =
>> > TransportClient.builder().settings(settings).build();
>> > for (String server : params.get("esHost").split(";"))
>> > {
>> > String[] components = server.trim().split(":");
>> > String host = components[0];
>> > int port = DEFAULT_PORT;
>> > if (components.length > 1)
>> > {
>> > port = Integer.parseInt(components[1]);
>> > }
>> >
>> > try {
>> > transportClient =
>> transportClient.addTransportAddress(new
>> > InetSocketTransportAddress(InetAddress.getByName(host), port));

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

2015-12-04 Thread Deepak Sharma
Hi Madhu
Would you be able to provide the use case here in ElasticSearch with Flink?

Thanks
Deepak

On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota 
wrote:

> Sure. I can submit the pull request.
>
> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels 
> wrote:
>
>> Hi Madhu,
>>
>> Great. Do you want to contribute it back via a GitHub pull request? If
>> not that's also fine. We will try look into the 2.0 connector next
>> week.
>>
>> Best,
>> Max
>>
>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota 
>> wrote:
>> > i have created working connector for Elasticsearch 2.0 based on
>> > elasticsearch-flink connector. I am using it right now but i want
>> official
>> > connector from flink.
>> >
>> > ElasticsearchSink.java
>> >
>> >
>> > import org.apache.flink.api.java.utils.ParameterTool;
>> > import org.apache.flink.configuration.Configuration;
>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > import java.net.InetAddress;
>> > import java.net.UnknownHostException;
>> > import java.util.List;
>> > import java.util.Map;
>> > import java.util.concurrent.atomic.AtomicBoolean;
>> > import java.util.concurrent.atomic.AtomicReference;
>> >
>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>> > import org.elasticsearch.action.bulk.BulkProcessor;
>> > import org.elasticsearch.action.bulk.BulkRequest;
>> > import org.elasticsearch.action.bulk.BulkResponse;
>> > import org.elasticsearch.action.index.IndexRequest;
>> > import org.elasticsearch.client.Client;
>> > import org.elasticsearch.client.transport.TransportClient;
>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>> > import org.elasticsearch.common.settings.Settings;
>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>> > import org.elasticsearch.common.unit.ByteSizeValue;
>> > import org.elasticsearch.common.unit.TimeValue;
>> >
>> >
>> > public class ElasticsearchSink extends RichSinkFunction {
>> >
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>> > "bulk.flush.max.actions";
>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>> > "bulk.flush.max.size.mb";
>> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>> > "bulk.flush.interval.ms";
>> >
>> > private static final long serialVersionUID = 1L;
>> > private static final int DEFAULT_PORT = 9300;
>> > private static final Logger LOG =
>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>> >
>> > /**
>> >  * The user specified config map that we forward to Elasticsearch
>> when
>> > we create the Client.
>> >  */
>> > private final Map userConfig;
>> >
>> > /**
>> >  * The builder that is used to construct an {@link IndexRequest}
>> from
>> > the incoming element.
>> >  */
>> > private final IndexRequestBuilder indexRequestBuilder;
>> >
>> > /**
>> >  * The Client that was either retrieved from a Node or is a
>> > TransportClient.
>> >  */
>> > private transient Client client;
>> >
>> > /**
>> >  * Bulk processor that was created using the client
>> >  */
>> > private transient BulkProcessor bulkProcessor;
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if there where
>> > failures in processing.
>> >  */
>> > private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>> >
>> > /**
>> >  * This is set from inside the BulkProcessor listener if a
>> Throwable was
>> > thrown during processing.
>> >  */
>> > private final AtomicReference failureThrowable = new
>> > AtomicReference();
>> >
>> > public ElasticsearchSink(Map userConfig,
>> > IndexRequestBuilder indexRequestBuilder) {
>> > this.userConfig = userConfig;
>> > this.indexRequestBuilder = indexRequestBuilder;
>> > }
>> >
>> >
>> > @Override
>> > public void open(Configuration configuration) {
>> >
>> > ParameterTool params = ParameterTool.fromMap(userConfig);
>> > Settings settings = Settings.settingsBuilder()
>> > .put(userConfig)
>> > .build();
>> >
>> > TransportClient transportClient =
>> > TransportClient.builder().settings(settings).build();
>> > for (String server : params.get("esHost").split(";"))
>> > {
>> > String[] components = server.trim().split(":");
>> > String host = components[0];
>> > int port = DEFAULT_PORT;
>> > if (components.length > 1)
>> > {
>> > port = Integer.parseInt(components[1]);
>> > }
>> >
>> > try {
>> > transportClient =
>> transportClient.addTransportAddress(new
>> > 

Any role for volunteering

2015-12-04 Thread Deepak Sharma
Hi All
Sorry for spamming your inbox.
I am really keen to work on a big data project full time(preferably remote
from India) , if not I am open to volunteering as well.
Please do let me know if there is any such opportunity available

-- 
Thanks
Deepak