Re: OutputFormat vs SinkFunction

2016-02-09 Thread Nick Dimiduk
I think managing a lifecycle around the existing MR OutputFormat's makes a
lot of sense for the streaming environment. Having them unified in the
Flink Streaming API will make users' lives much better, and keeps the
streaming world open to the large existing ecosystem.

On Tue, Feb 9, 2016 at 6:13 AM, Stephan Ewen  wrote:

> Most of the problems we had with OutputFormats is that many were
> implemented in a batchy way:
>   - They buffer data and write large chunks at some points
>   - They need the "close()" call before any consistent result is guaranteed
>
> That is mostly the case for FileOutputFormats, but not exclusively (some
> DB OutputFormats also buffer and batch-commit in some intervals).
>
> In general, it should be simply possible to unify the two by adding a
> "sync()" or "ensurePersistent()" call to the OutputFormats. That method
> could be called upon checkpoints, or periodically, to ensure persistence
> and result visibility.
>
> The initial idea behind having SinkFunctions was to not interfere with the
> batch code and not break things (by adding new abstract methods).
> What we could do, however, is to allow OutputFormats to implement an
> interface like "Streamable" which would add the above mentioned methods and
> make the OutputFormat safe for streaming.
> We could then bit by bit add that interface to the existing output formats.
>
>
> Any thoughts on that?
>
> Greetings,
> Stephan
>
>
>
> On Tue, Feb 9, 2016 at 10:23 AM, Maximilian Michels 
> wrote:
>
>> I think you have a point, Nick. OutputFormats on its own have the same
>> fault-tolerance semantics as SinkFunctions. What kind of failure semantics
>> they guarantee depends on the actual implementation. For instance, the
>> RMQSource has exactly-once semantics but the RMQSink currently does not. If
>> you care about exactly-once semantics, you have to look into the
>> documentation and use the sources and sinks accordingly. It is not like
>> OutputFormats are dangerous but all SinkFunctions are failure-proof.
>>
>> Consolidating the two interfaces would make sense. It might be a bit late
>> for the 1.0 release because I see that we would need to find a consensus
>> first and there are many things in the backlog :)
>>
>> On Tue, Feb 9, 2016 at 3:20 AM, Nick Dimiduk  wrote:
>>
>>> I think this depends on the implementation of the OutputFormat. For
>>> instance, an HBase, Cassandra or ES OF will handle most operations as
>>> idempotent when the scheme is designed appropriately.
>>>
>>> You are (rightly) focusing on FileOF's, which also depend on the
>>> semantics of their implementation. MR always required an atomic rename of
>>> the DFS, and only moved output files into place once the task commits its
>>> output.
>>>
>>> Also I think it unreasonable to bring exactly once considerations into
>>> the discussion because nothing provides this right now without a
>>> multi-stage commit protocol. Such a protocol would be provided at the
>>> framework level and to the best of my knowledge it's semantic expectations
>>> on the output handler are undefined.
>>>
>>> My original question comes from wanting to use the LocalCollectionOF to
>>> test a streaming flow that sinks to Kafka, without rewriting the flow in
>>> test code. So in this case you're right that it does apply to tests. I
>>> don't think correctness of tests is a trivial concern though.
>>>
>>> As for RollingFileSink, I've not seen this conversation so I cannot
>>> comment. Per my earlier examples, I think it's not correct to assume all OF
>>> implementations are file-based.
>>>
>>>
>>> On Monday, February 8, 2016, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 one problem that I see with OutputFormat is that they are not made for
 a streaming world. By this, I mean that they don’t handle failure well and
 don’t consider fault-torelant streaming, i.e. exactly once semantics. For
 example, what would be expected to happen if a job with a FileOutputFormat
 fails and needs to recover. Now, there might be some garbage left in the
 files that would get emitted again after restoring to a checkpoint, thus
 leading to duplicate results.

 Having OutputFormats in a Streaming programs can work well in toy
 examples and tests but can be dangerous in real-world jobs. I once talked
 with Robert about this and we came up with the idea (I think it was mostly
 him) of generalizing the RollingFileSink (which is fault-tolerance aware)
 so that it can easily be used with something akin to OutputFormats.

 What do you think?

 -Aljoscha
 > On 08 Feb 2016, at 19:40, Nick Dimiduk  wrote:
 >
 > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels 
 wrote:
 > Changing the class hierarchy would break backwards-compatibility of
 the API. However, we could add another method to DataStream to easily use
 

Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Fabian Hueske
Hi,
glad you could resolve the POJO issue, but the new error doesn't look
right.
The CO_GROUP_RAW strategy should only be used for programs that are
implemented against the Python DataSet API.
I guess that's not the case since all code snippets were Java so far.

Can you post the full stacktrace of the exception?

2016-02-09 20:13 GMT+01:00 Dominique Rondé :

> Hi all,
>
> i finally figured out that there is a getter for a boolean field which may
> be the source of the trouble. It seems that getBooleanField (as we use it)
> is not the best choice. Now the plan is executed with another error code. :(
>
> Caused by: java.lang.Exception: Unsupported driver strategy for join
> driver: CO_GROUP_RAW
>
> Is there any link to a documentation or some example code which you  may
> recommend beside the offical documentation?
>
> But folks, thanks for your greate support! A really nice community here!
>
> Greets
> Dominique
>
>
> Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
>
> I tested the TypeExtractor with your SourceA and SourceB types (adding
> proper setters and getters) and it correctly returned a PojoType. Thus, I
> would suspect that you haven’t specified the proper setters and getters in
> your implementation.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
>
>> Here we go!
>>
>>   ExecutionEnvironment env =
>> ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
>> 53408,"flink-job.jar");
>>
>>
>>   DataSource datasourceA=
>> env.readTextFile("hdfs://dev//sourceA/");
>>   DataSource datasourceB=
>> env.readTextFile("hdfs://dev//sourceB/");
>>
>>   DataSet sourceA= datasourceA.map(new SourceAMapper());
>>   DataSet sourceB= datasourceB.map(new SourceBMapper());
>>
>>   sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>>
>> Thanks a lot!
>> Dominique
>>
>>
>> Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>>
>> Could you post the complete example code (Flink job including the type
>> definitions). For example, if the data sets are of type DataSet,
>> then it will be treated as a GenericType. Judging from your pseudo code,
>> it looks fine on the first glance.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
>> dominique.ro...@codecentric.de> wrote:
>>
>>> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
>>> String?
>>>
>>> public abstract class Parent{
>>>   private Date eventDate;
>>>   private EventType eventType;
>>>   private String sessionId;
>>>
>>> public Parent() { }
>>> //GETTER & SETTER
>>> }
>>>
>>> public class SourceA extends Parent{
>>>   private Boolean outboundMessage;
>>>   private String soapMessage;
>>>
>>> public SourceA () {
>>> super();
>>>  }
>>> //GETTER & SETTER
>>> }
>>>
>>> public class SourceB extends Parent{
>>>   private Integer id;
>>>   private String username;
>>>
>>> public SourceB () {
>>> super();
>>>  }
>>> //GETTER & SETTER
>>>
>>> }
>>>
>>> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>>
>>> Could you share the code for your types SourceA and SourceB. It seems
>>> as if Flink does not recognize them to be POJOs because he assigned them
>>> the GenericType type. Either there is something wrong with the type
>>> extractor or your implementation does not fulfil the requirements for
>>> POJOs, as indicated by Chiwan.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
>>> dominique.ro...@codecentric.de> wrote:
>>>
 The fields in SourceA and SourceB are private but have public getters
 and setters. The classes provide an empty and public constructor.
 Am 09.02.2016 11:47 schrieb "Chiwan Park" < 
 chiwanp...@apache.org>:

> Oh, the fields in SourceA have public getters. Does the fields in
> SourceA have public setter? SourceA needs public setter for private 
> fields.
>
> Regards,
> Chiwan Park
>
> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < 
> chiwanp...@apache.org> wrote:
> >
> > Hi Dominique,
> >
> > It seems that `SourceA` is not dealt as POJO. Are all fields in
> SourceA public? There are some requirements for POJO classes [1].
> >
> > [1]:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> >
> > Regards,
> > Chiwan Park
> >
> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
> >>
> >> Hi  folks,
> >>
> >> i try to join two datasets containing some PoJos. Each PoJo inherit
> a field "sessionId" from the parent class. The field is private but has a
> public getter.
> >>
> 

Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Till Rohrmann
I tested the TypeExtractor with your SourceA and SourceB types (adding
proper setters and getters) and it correctly returned a PojoType. Thus, I
would suspect that you haven’t specified the proper setters and getters in
your implementation.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <
dominique.ro...@codecentric.de> wrote:

> Here we go!
>
>   ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
> 53408,"flink-job.jar");
>
>
>   DataSource datasourceA= env.readTextFile("hdfs://dev//sourceA/");
>   DataSource datasourceB= env.readTextFile("hdfs://dev//sourceB/");
>
>   DataSet sourceA= datasourceA.map(new SourceAMapper());
>   DataSet sourceB= datasourceB.map(new SourceBMapper());
>
>   sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>
> Thanks a lot!
> Dominique
>
>
> Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>
> Could you post the complete example code (Flink job including the type
> definitions). For example, if the data sets are of type DataSet,
> then it will be treated as a GenericType. Judging from your pseudo code,
> it looks fine on the first glance.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
>
>> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
>> String?
>>
>> public abstract class Parent{
>>   private Date eventDate;
>>   private EventType eventType;
>>   private String sessionId;
>>
>> public Parent() { }
>> //GETTER & SETTER
>> }
>>
>> public class SourceA extends Parent{
>>   private Boolean outboundMessage;
>>   private String soapMessage;
>>
>> public SourceA () {
>> super();
>>  }
>> //GETTER & SETTER
>> }
>>
>> public class SourceB extends Parent{
>>   private Integer id;
>>   private String username;
>>
>> public SourceB () {
>> super();
>>  }
>> //GETTER & SETTER
>>
>> }
>>
>> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>
>> Could you share the code for your types SourceA and SourceB. It seems as
>> if Flink does not recognize them to be POJOs because he assigned them the
>> GenericType type. Either there is something wrong with the type
>> extractor or your implementation does not fulfil the requirements for
>> POJOs, as indicated by Chiwan.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
>> dominique.ro...@codecentric.de> wrote:
>>
>>> The fields in SourceA and SourceB are private but have public getters
>>> and setters. The classes provide an empty and public constructor.
>>> Am 09.02.2016 11:47 schrieb "Chiwan Park" < 
>>> chiwanp...@apache.org>:
>>>
 Oh, the fields in SourceA have public getters. Does the fields in
 SourceA have public setter? SourceA needs public setter for private fields.

 Regards,
 Chiwan Park

 > On Feb 9, 2016, at 7:45 PM, Chiwan Park < 
 chiwanp...@apache.org> wrote:
 >
 > Hi Dominique,
 >
 > It seems that `SourceA` is not dealt as POJO. Are all fields in
 SourceA public? There are some requirements for POJO classes [1].
 >
 > [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
 >
 > Regards,
 > Chiwan Park
 >
 >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
 dominique.ro...@codecentric.de> wrote:
 >>
 >> Hi  folks,
 >>
 >> i try to join two datasets containing some PoJos. Each PoJo inherit
 a field "sessionId" from the parent class. The field is private but has a
 public getter.
 >>
 >> The join is like this:
 >> DataSet> joinedDataSet =
 sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
 >>
 >> But the result is the following execption:
 >>
 >> Exception in thread "main"
 org.apache.flink.api.common.InvalidProgramException: This type
 (GenericType) cannot be used as key.
 >>at
 org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
 >>at
 org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
 >>at
 x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
 >>
 >> I spend some time with google around but I don't get an idea what is
 wrong. I hope some of you can give me a hint...
 >>
 >> Greets
 >> Dominique
 >>
 >


>>
>> --
>> Dominique Rondé | Senior Consultant
>>
>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
>> www.meettheexperts.de | www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . 

Re: pom file scala plugin error [flink-examples-batch project]

2016-02-09 Thread Stephan Ewen
Hi!

The Eclipse Setup is quite tricky, because Eclipse's Maven and Scala
support is not very good.

Here is how I got it to run:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/ide_setup.html#eclipse-scala-ide-303

Stephan


On Tue, Feb 9, 2016 at 10:20 AM, Robert Metzger  wrote:

> Hi Subash,
>
> I think the two errors are unrelated. Maven is failing because of the
> checkstyle plugin. It checks if the code follows our coding guidelines.
>
> If you are experienced with IntelliJ, I would suggest to use that IDE.
> Most Flink committers are using it because its difficult to get Eclipse set
> up for Flink.
>
>
> On Mon, Feb 8, 2016 at 2:52 PM, subash basnet  wrote:
>
>> Hello all,
>>
>> I am trying to maven build the flink-examples-batch project I imported
>> from here
>> 
>>  but
>> it gives me build failure exception as shown in the attached image.
>>
>> Eclipse shows the below explanation on pointing to  node in
>> pom file.
>>
>> *Plugin execution not covered by lifecycle configuration:
>> net.alchim31.maven:scala-maven-plugin:3.1.4:compile (execution:
>> scala-compile-*
>>
>>
>> * first, phase: process-resources)*
>> I have attached screenshot of the my installed plugins in eclipse.
>> As suggested in web, scala maven plugin was installed from
>> http://alchim31.free.fr/m2e-scala/update-site
>> What could be the issue?
>>
>> Best Regards,
>> Subash Basnet
>>
>
>


Re: Kafka partition alignment for event time

2016-02-09 Thread Stephan Ewen
Hi Shikar!

What you are seeing is that some streams (here the different Kafka
Partitions in one source) get merged in the source task. That happens
before watermarks are generated.
In such a case, records are out-of-order when they arrive at the
timestamp-extractor/watermark generator, and the watermark generator needs
to be implemented such that it is aware of these out-of-order records, and
uses some heuristic to generate watermarks. This is actually the general
case that one also has if timestamps are not ascending inside a single
Kafka partition.

You probably want to make use of the simple case, where timestamps are
ascending inside one Kafka partition, and use the
ascending-timestamp-extractor that auto-generates watermarks.
With Kafka, that one only works when there is 1:1 sources to partitions.


I think we can add some tooling that makes it possible to use the simple
ascending timestamp extraction also in cases where one parallel source task
has multiple Kafka partitions.
Effectively, the Kafka source has to internally generate the watermarks and
use the same "watermark union" technique as for example the join operator.

Here is the issue to track this:
https://issues.apache.org/jira/browse/FLINK-3375

Greetings,
Stephan


On Mon, Feb 8, 2016 at 9:51 PM, shikhar  wrote:

> Stephan explained in that thread that we're picking the min watermark when
> doing operations that join streams from multiple sources. If we have m:n
> partition-source assignment where m>n, the source is going to end up with
> the max watermark. Having m<=n ensures that the lowest watermark is used.
>
> Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition
> on a source should require opt-in, e.g. allowOversubscription()
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Quick question about enableObjectReuse()

2016-02-09 Thread LINZ, Arnaud
Hi,

I just want to be sure : when I set enableObjectReuse, I don’t need to create 
copies of objects that I get as input and return as output but which I don’t 
keep inside my user function ?
For instance, if I want to join Tuple2(A,B) with C into Tuple3(A,B,C) using a 
Join function, I can write something like :

public Tuple3 join(Tuple2 first, Object second) {
return Tuple3.of(first.f0, first.f1, second);
}
And not   return Tuple3.of(first.f0.clone(), first.f1.clone(), 
second.clone()) ?


Best regards,
Arnaud





L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
I wrote a sample inherited POJO example [1]. The example works with Flink 
0.10.1 and 1.0-SNAPSHOT.

[1]: https://gist.github.com/chiwanpark/0389ce946e4fff58d611

Regards,
Chiwan Park

> On Feb 9, 2016, at 8:07 PM, Fabian Hueske  wrote:
> 
> What is the type of sessionId? 
> It must be a key type in order to be used as key. If it is a generic class, 
> it must implement Comparable to be used as key.
> 
> 2016-02-09 11:53 GMT+01:00 Dominique Rondé :
> The fields in SourceA and SourceB are private but have public getters and 
> setters. The classes provide an empty and public constructor.
> 
> Am 09.02.2016 11:47 schrieb "Chiwan Park" :
> Oh, the fields in SourceA have public getters. Does the fields in SourceA 
> have public setter? SourceA needs public setter for private fields.
> 
> Regards,
> Chiwan Park
> 
> > On Feb 9, 2016, at 7:45 PM, Chiwan Park  wrote:
> >
> > Hi Dominique,
> >
> > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA 
> > public? There are some requirements for POJO classes [1].
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> >
> > Regards,
> > Chiwan Park
> >
> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé 
> >>  wrote:
> >>
> >> Hi  folks,
> >>
> >> i try to join two datasets containing some PoJos. Each PoJo inherit a 
> >> field "sessionId" from the parent class. The field is private but has a 
> >> public getter.
> >>
> >> The join is like this:
> >> DataSet> joinedDataSet = 
> >> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
> >>
> >> But the result is the following execption:
> >>
> >> Exception in thread "main" 
> >> org.apache.flink.api.common.InvalidProgramException: This type 
> >> (GenericType) cannot be used as key.
> >>at 
> >> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
> >>at 
> >> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
> >>at 
> >> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
> >>
> >> I spend some time with google around but I don't get an idea what is 
> >> wrong. I hope some of you can give me a hint...
> >>
> >> Greets
> >> Dominique
> >>
> >
> 
> 



Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
Hi Dominique,

It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA public? 
There are some requirements for POJO classes [1].

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:42 PM, Dominique Rondé  
> wrote:
> 
> Hi  folks, 
> 
> i try to join two datasets containing some PoJos. Each PoJo inherit a field 
> "sessionId" from the parent class. The field is private but has a public 
> getter. 
> 
> The join is like this: 
> DataSet> joinedDataSet = 
> sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); 
> 
> But the result is the following execption: 
> 
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be used as key. 
> at 
> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
> at 
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
> at 
> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) 
> 
> I spend some time with google around but I don't get an idea what is wrong. I 
> hope some of you can give me a hint... 
> 
> Greets 
> Dominique 
> 



Re: GC on TaskManagers stats

2016-02-09 Thread Robert Metzger
Hi Guido,

sorry for the late reply. You were collecting the stats every 1 second.
Afaik, Flink is internally collecting the stats with a frequency of 5
seconds, so you can either change your or Flink's polling interval (I think
its taskmanager.heartbeat-interval)

Regarding the details on PS-Scavenge, MarkSweep etc.: We just use the names
the Java management beans return, so you can just google for the names and
read how to interpret them. For example:
http://www.ibm.com/developerworks/library/j-jtp11253/

The load is the operating system load.



On Thu, Feb 4, 2016 at 10:25 PM, Guido  wrote:

> Hello,
>
> I have few questions regarding garbage collector’s stats on Taskmanagers
> and any help or further documentation would be great.
> I have collected “1 second polling requesting" stats on 7 Taskmanagers,
> through the relative request (/taskmanagers//) of the
> Monitoring REST API  while a job, that overall took 38 seconds, was
> running.
>
> This way got 38 records for each TaskManager and focusing on garbage
> collector’s stats I can see, for example on 1 of the 38th records:
>
> - PS-Scavenge.Time: 2597, PS-MarkSweep.Time: 29016;
> 1. Is It correct to assume they represent the total elapsed time on
> different GCs (respectively young and old gen)? So, I basically got a
> running sum distribution?
> 2. If yes, values are in mills, so 29 sec?
>
> 3. Could they be used to get how much time has been wasted in total
> because of the “Stop-the-world” GCs policy?
>
> Finally, on the same record:
>
> - PS-Scavenge.Count: 3, PS-MarkSweep.Time: 5, load: 3.73.
>
> 4. Is it the “load” value tightly related?
>
> Sorry if it has been quite long and thanks a lot.
>
> Guido
>
>
>


Re: Kafka partition alignment for event time

2016-02-09 Thread Fabian Hueske
Hi,

where did you observe the duplicates, within Flink or in Kafka?
Please be aware that the Flink Kafka Producer does not provide exactly-once
consistency. This is not easily possible because Kafka does not support
transactional writes yet.

Flink's exactly-once guarantees are only valid within the Flink DataStream
program and for some sinks such as the RollingFileSink.

Cheers, Fabian

2016-02-09 10:21 GMT+01:00 Aljoscha Krettek :

> Hi,
> in general it should not be a problem if one parallel instance of a sink
> is responsible for several Kafka partitions. It can become a problem if the
> timestamps in the different partitions differ by a lot and the watermark
> assignment logic is not able to handle this.
>
> How are you assigning the timestamps/watermarks in your job?
>
> Cheers,
> Aljoscha
> > On 08 Feb 2016, at 21:51, shikhar  wrote:
> >
> > Stephan explained in that thread that we're picking the min watermark
> when
> > doing operations that join streams from multiple sources. If we have m:n
> > partition-source assignment where m>n, the source is going to end up with
> > the max watermark. Having m<=n ensures that the lowest watermark is used.
> >
> > Re: automatic enforcement, perhaps allowing for more than 1 Kafka
> partition
> > on a source should require opt-in, e.g. allowOversubscription()
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Dominique Rondé
The fields in SourceA and SourceB are private but have public getters and
setters. The classes provide an empty and public constructor.
Am 09.02.2016 11:47 schrieb "Chiwan Park" :

> Oh, the fields in SourceA have public getters. Does the fields in SourceA
> have public setter? SourceA needs public setter for private fields.
>
> Regards,
> Chiwan Park
>
> > On Feb 9, 2016, at 7:45 PM, Chiwan Park  wrote:
> >
> > Hi Dominique,
> >
> > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA
> public? There are some requirements for POJO classes [1].
> >
> > [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> >
> > Regards,
> > Chiwan Park
> >
> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
> >>
> >> Hi  folks,
> >>
> >> i try to join two datasets containing some PoJos. Each PoJo inherit a
> field "sessionId" from the parent class. The field is private but has a
> public getter.
> >>
> >> The join is like this:
> >> DataSet> joinedDataSet =
> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
> >>
> >> But the result is the following execption:
> >>
> >> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot be used as key.
> >>at
> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
> >>at
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
> >>at
> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
> >>
> >> I spend some time with google around but I don't get an idea what is
> wrong. I hope some of you can give me a hint...
> >>
> >> Greets
> >> Dominique
> >>
> >
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Till Rohrmann
Could you share the code for your types SourceA and SourceB. It seems as if
Flink does not recognize them to be POJOs because he assigned them the
GenericType type. Either there is something wrong with the type extractor
or your implementation does not fulfil the requirements for POJOs, as
indicated by Chiwan.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
dominique.ro...@codecentric.de> wrote:

> The fields in SourceA and SourceB are private but have public getters and
> setters. The classes provide an empty and public constructor.
> Am 09.02.2016 11:47 schrieb "Chiwan Park" :
>
>> Oh, the fields in SourceA have public getters. Does the fields in SourceA
>> have public setter? SourceA needs public setter for private fields.
>>
>> Regards,
>> Chiwan Park
>>
>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park  wrote:
>> >
>> > Hi Dominique,
>> >
>> > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA
>> public? There are some requirements for POJO classes [1].
>> >
>> > [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>> dominique.ro...@codecentric.de> wrote:
>> >>
>> >> Hi  folks,
>> >>
>> >> i try to join two datasets containing some PoJos. Each PoJo inherit a
>> field "sessionId" from the parent class. The field is private but has a
>> public getter.
>> >>
>> >> The join is like this:
>> >> DataSet> joinedDataSet =
>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>> >>
>> >> But the result is the following execption:
>> >>
>> >> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: This type
>> (GenericType) cannot be used as key.
>> >>at
>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>> >>at
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>> >>at
>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>> >>
>> >> I spend some time with google around but I don't get an idea what is
>> wrong. I hope some of you can give me a hint...
>> >>
>> >> Greets
>> >> Dominique
>> >>
>> >
>>
>>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Fabian Hueske
What is the type of sessionId?
It must be a key type in order to be used as key. If it is a generic class,
it must implement Comparable to be used as key.

2016-02-09 11:53 GMT+01:00 Dominique Rondé :

> The fields in SourceA and SourceB are private but have public getters and
> setters. The classes provide an empty and public constructor.
> Am 09.02.2016 11:47 schrieb "Chiwan Park" :
>
>> Oh, the fields in SourceA have public getters. Does the fields in SourceA
>> have public setter? SourceA needs public setter for private fields.
>>
>> Regards,
>> Chiwan Park
>>
>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park  wrote:
>> >
>> > Hi Dominique,
>> >
>> > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA
>> public? There are some requirements for POJO classes [1].
>> >
>> > [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>> dominique.ro...@codecentric.de> wrote:
>> >>
>> >> Hi  folks,
>> >>
>> >> i try to join two datasets containing some PoJos. Each PoJo inherit a
>> field "sessionId" from the parent class. The field is private but has a
>> public getter.
>> >>
>> >> The join is like this:
>> >> DataSet> joinedDataSet =
>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>> >>
>> >> But the result is the following execption:
>> >>
>> >> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: This type
>> (GenericType) cannot be used as key.
>> >>at
>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>> >>at
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>> >>at
>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>> >>
>> >> I spend some time with google around but I don't get an idea what is
>> wrong. I hope some of you can give me a hint...
>> >>
>> >> Greets
>> >> Dominique
>> >>
>> >
>>
>>


Re: Quick question about enableObjectReuse()

2016-02-09 Thread Stephan Ewen
The only thing you need to be aware of (in the batch API) is that you
cannot simply gather elements in a list any more.

The following does not work when enabling object reuse:


class MyReducer implements GroupReduceFunction {

public void reduceGroup(Iterable values, Collector out) {
List allValues = new List();
for (In i : values) {
allValues.add(i);
}
}
}


In general, object reuse is a feature that makes sense if you want to
optimize GC performance. If you don't need it, all the better. It is fairly
easy to accidentally implement the user functions to hold onto reused
object references, and compute wrong results that way.

Greetings,
Stephan



On Tue, Feb 9, 2016 at 10:48 AM, Till Rohrmann  wrote:

> Yes, you're right Arnaud.
>
> Cheers,
> Till
>
> On Tue, Feb 9, 2016 at 10:42 AM, LINZ, Arnaud 
> wrote:
>
>> Hi,
>>
>>
>>
>> I just want to be sure : when I set enableObjectReuse, I don’t need to
>> create copies of objects that I get as input and return as output but which
>> I don’t keep inside my user function ?
>>
>> For instance, if I want to join Tuple2(A,B) with C into Tuple3(A,B,C)
>> using a Join function, I can write something like :
>>
>>
>>
>> public Tuple3 join(Tuple2 first, Object second) {
>>
>> return Tuple3.of(first.f0, first.f1, second);
>>
>> }
>>
>> And not   return Tuple3.of(first.f0.clone(),
>> first.f1.clone(), second.clone()) ?
>>
>>
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>> --
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Chiwan Park
Oh, the fields in SourceA have public getters. Does the fields in SourceA have 
public setter? SourceA needs public setter for private fields.

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:45 PM, Chiwan Park  wrote:
> 
> Hi Dominique,
> 
> It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA 
> public? There are some requirements for POJO classes [1].
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
> 
> Regards,
> Chiwan Park
> 
>> On Feb 9, 2016, at 7:42 PM, Dominique Rondé  
>> wrote:
>> 
>> Hi  folks, 
>> 
>> i try to join two datasets containing some PoJos. Each PoJo inherit a field 
>> "sessionId" from the parent class. The field is private but has a public 
>> getter. 
>> 
>> The join is like this: 
>> DataSet> joinedDataSet = 
>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); 
>> 
>> But the result is the following execption: 
>> 
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException: This type 
>> (GenericType) cannot be used as key. 
>>at 
>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>at 
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>at 
>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) 
>> 
>> I spend some time with google around but I don't get an idea what is wrong. 
>> I hope some of you can give me a hint... 
>> 
>> Greets 
>> Dominique 
>> 
> 



Re: Kafka partition alignment for event time

2016-02-09 Thread shikhar
Yes that approach seems perfect Stephan, thanks for creating the JIRA!

It is not only when resetting to smallest, I have observed uneven progress
on partitions skewing the watermark any time the source is not caught up to
the head of each partition it is handling, like when stopping for a few mins
and starting it back up (the offsets it's resuming from are approx the same
number of messages behind).



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4819.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka partition alignment for event time

2016-02-09 Thread shikhar
Hi Fabian,

Sorry, I should have been clearer. What I meant (or now know!) by duplicate
emits is that since the watermark is progressing more rapidly than the state
of the offsets on some partitions due to the source multiplexing more than 1
partition, when messages from the lagging partitions are passed on to
further operators specifically time-based windowing they get emitted
immediately, resulting in duplicate windows
(https://issues.apache.org/jira/browse/FLINK-2870).



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4817.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka partition alignment for event time

2016-02-09 Thread Stephan Ewen
Thanks for filling us in.

If the problem comes from the fact that the difference between partitions
becomes high sometimes (when resetting to the smallest offset),
then this could probably be solved similarly as suggested here (
https://issues.apache.org/jira/browse/FLINK-3375) by running
a watermark assigner (ascending, threashold / whatever) per partition
inside the Kafka Source.

What do you think?


On Tue, Feb 9, 2016 at 3:01 PM, shikhar  wrote:

> I am assigning timestamps using a  threshold-based extractor
>    -- the static
> delta
> from last timestamp is probably sufficient and the PriorityQueue for
> allowing outliers not necessary, that is something I added while figuring
> out what was going on.
>
> The timestamps across partitions don't differ that much in normal operation
> when stream processing is caught up with the head of the partitions, so the
> thresholding works well. However, during catch-up, like if I stop for a bit
> & start the job again, or there is no offset in ZK and I'm using
> 'auto.offset.reset=smallest', the source tends to emit messages with much
> larger deviations, and the timestamp extraction which is not
> partition-aware
> will start providing an incorrect watermark.
>
>
> Aljoscha Krettek wrote
> > Hi,
> > in general it should not be a problem if one parallel instance of a sink
> > is responsible for several Kafka partitions. It can become a problem if
> > the timestamps in the different partitions differ by a lot and the
> > watermark assignment logic is not able to handle this.
> >
> > How are you assigning the timestamps/watermarks in your job?
> >
> > Cheers,
> > Aljoscha
> >> On 08 Feb 2016, at 21:51, shikhar 
>
> > shikhar@
>
> >  wrote:
> >>
> >> Stephan explained in that thread that we're picking the min watermark
> >> when
> >> doing operations that join streams from multiple sources. If we have m:n
> >> partition-source assignment where m>n, the source is going to end up
> with
> >> the max watermark. Having m<=n ensures that the lowest watermark is
> used.
> >>
> >> Re: automatic enforcement, perhaps allowing for more than 1 Kafka
> >> partition
> >> on a source should require opt-in, e.g. allowOversubscription()
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive at Nabble.com.
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4816.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Till Rohrmann
Could you post the complete example code (Flink job including the type
definitions). For example, if the data sets are of type DataSet,
then it will be treated as a GenericType. Judging from your pseudo code, it
looks fine on the first glance.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
dominique.ro...@codecentric.de> wrote:

> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
> String?
>
> public abstract class Parent{
>   private Date eventDate;
>   private EventType eventType;
>   private String sessionId;
>
> public Parent() { }
> //GETTER & SETTER
> }
>
> public class SourceA extends Parent{
>   private Boolean outboundMessage;
>   private String soapMessage;
>
> public SourceA () {
> super();
>  }
> //GETTER & SETTER
> }
>
> public class SourceB extends Parent{
>   private Integer id;
>   private String username;
>
> public SourceB () {
> super();
>  }
> //GETTER & SETTER
>
> }
>
> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>
> Could you share the code for your types SourceA and SourceB. It seems as
> if Flink does not recognize them to be POJOs because he assigned them the
> GenericType type. Either there is something wrong with the type extractor
> or your implementation does not fulfil the requirements for POJOs, as
> indicated by Chiwan.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
>
>> The fields in SourceA and SourceB are private but have public getters and
>> setters. The classes provide an empty and public constructor.
>> Am 09.02.2016 11:47 schrieb "Chiwan Park" :
>>
>>> Oh, the fields in SourceA have public getters. Does the fields in
>>> SourceA have public setter? SourceA needs public setter for private fields.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < 
>>> chiwanp...@apache.org> wrote:
>>> >
>>> > Hi Dominique,
>>> >
>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in
>>> SourceA public? There are some requirements for POJO classes [1].
>>> >
>>> > [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>> >
>>> > Regards,
>>> > Chiwan Park
>>> >
>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>>> dominique.ro...@codecentric.de> wrote:
>>> >>
>>> >> Hi  folks,
>>> >>
>>> >> i try to join two datasets containing some PoJos. Each PoJo inherit a
>>> field "sessionId" from the parent class. The field is private but has a
>>> public getter.
>>> >>
>>> >> The join is like this:
>>> >> DataSet> joinedDataSet =
>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>> >>
>>> >> But the result is the following execption:
>>> >>
>>> >> Exception in thread "main"
>>> org.apache.flink.api.common.InvalidProgramException: This type
>>> (GenericType) cannot be used as key.
>>> >>at
>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>> >>at
>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>> >>at
>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>> >>
>>> >> I spend some time with google around but I don't get an idea what is
>>> wrong. I hope some of you can give me a hint...
>>> >>
>>> >> Greets
>>> >> Dominique
>>> >>
>>> >
>>>
>>>
>
> --
> Dominique Rondé | Senior Consultant
>
> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
> www.meettheexperts.de | www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Dominique Rondé

Sorry, i was out for lunch. Maybe the problem is that sessionID is a String?

public abstract class Parent{
  private Date eventDate;
  private EventType eventType;
  private String sessionId;

public Parent() { }
//GETTER & SETTER
}

public class SourceA extends Parent{
  private Boolean outboundMessage;
  private String soapMessage;

public SourceA () {
super();
 }
//GETTER & SETTER
}

public class SourceB extends Parent{
  private Integer id;
  private String username;

public SourceB () {
super();
 }
//GETTER & SETTER
}

Am 09.02.2016 um 12:06 schrieb Till Rohrmann:


Could you share the code for your types |SourceA| and |SourceB|. It 
seems as if Flink does not recognize them to be POJOs because he 
assigned them the |GenericType| type. Either there is something wrong 
with the type extractor or your implementation does not fulfil the 
requirements for POJOs, as indicated by Chiwan.


Cheers,
Till

​

On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé 
> wrote:


The fields in SourceA and SourceB are private but have public
getters and setters. The classes provide an empty and public
constructor.

Am 09.02.2016 11:47 schrieb "Chiwan Park" >:

Oh, the fields in SourceA have public getters. Does the fields
in SourceA have public setter? SourceA needs public setter for
private fields.

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:45 PM, Chiwan Park
> wrote:
>
> Hi Dominique,
>
> It seems that `SourceA` is not dealt as POJO. Are all fields
in SourceA public? There are some requirements for POJO
classes [1].
>
> [1]:

https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>
> Regards,
> Chiwan Park
>
>> On Feb 9, 2016, at 7:42 PM, Dominique Rondé
> wrote:
>>
>> Hi  folks,
>>
>> i try to join two datasets containing some PoJos. Each PoJo
inherit a field "sessionId" from the parent class. The field
is private but has a public getter.
>>
>> The join is like this:
>> DataSet> joinedDataSet =
sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>
>> But the result is the following execption:
>>
>> Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: This type
(GenericType) cannot be
used as key.
>>at

org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>at

org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>at
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>
>> I spend some time with google around but I don't get an
idea what is wrong. I hope some of you can give me a hint...
>>
>> Greets
>> Dominique
>>
>




--
Dominique Rondé | Senior Consultant

codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil: +49 (0) 172.7182592
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | 
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz



Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Fabian Hueske
String is perfectly fine as key.
Looks like SourceA / SourceB are not correctly identified as Pojos.

2016-02-09 14:25 GMT+01:00 Dominique Rondé :

> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
> String?
>
> public abstract class Parent{
>   private Date eventDate;
>   private EventType eventType;
>   private String sessionId;
>
> public Parent() { }
> //GETTER & SETTER
> }
>
> public class SourceA extends Parent{
>   private Boolean outboundMessage;
>   private String soapMessage;
>
> public SourceA () {
> super();
>  }
> //GETTER & SETTER
> }
>
> public class SourceB extends Parent{
>   private Integer id;
>   private String username;
>
> public SourceB () {
> super();
>  }
> //GETTER & SETTER
>
> }
>
> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>
> Could you share the code for your types SourceA and SourceB. It seems as
> if Flink does not recognize them to be POJOs because he assigned them the
> GenericType type. Either there is something wrong with the type extractor
> or your implementation does not fulfil the requirements for POJOs, as
> indicated by Chiwan.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
>
>> The fields in SourceA and SourceB are private but have public getters and
>> setters. The classes provide an empty and public constructor.
>> Am 09.02.2016 11:47 schrieb "Chiwan Park" :
>>
>>> Oh, the fields in SourceA have public getters. Does the fields in
>>> SourceA have public setter? SourceA needs public setter for private fields.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < 
>>> chiwanp...@apache.org> wrote:
>>> >
>>> > Hi Dominique,
>>> >
>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in
>>> SourceA public? There are some requirements for POJO classes [1].
>>> >
>>> > [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>> >
>>> > Regards,
>>> > Chiwan Park
>>> >
>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>>> dominique.ro...@codecentric.de> wrote:
>>> >>
>>> >> Hi  folks,
>>> >>
>>> >> i try to join two datasets containing some PoJos. Each PoJo inherit a
>>> field "sessionId" from the parent class. The field is private but has a
>>> public getter.
>>> >>
>>> >> The join is like this:
>>> >> DataSet> joinedDataSet =
>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>> >>
>>> >> But the result is the following execption:
>>> >>
>>> >> Exception in thread "main"
>>> org.apache.flink.api.common.InvalidProgramException: This type
>>> (GenericType) cannot be used as key.
>>> >>at
>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>> >>at
>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>> >>at
>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>> >>
>>> >> I spend some time with google around but I don't get an idea what is
>>> wrong. I hope some of you can give me a hint...
>>> >>
>>> >> Greets
>>> >> Dominique
>>> >>
>>> >
>>>
>>>
>
> --
> Dominique Rondé | Senior Consultant
>
> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
> www.meettheexperts.de | www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Dominique Rondé

Here we go!

  ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx", 
53408,"flink-job.jar");



  DataSource datasourceA= env.readTextFile("hdfs://dev//sourceA/");
  DataSource datasourceB= env.readTextFile("hdfs://dev//sourceB/");

  DataSet sourceA= datasourceA.map(new SourceAMapper());
  DataSet sourceB= datasourceB.map(new SourceBMapper());

sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();

Thanks a lot!
Dominique

Am 09.02.2016 um 14:36 schrieb Till Rohrmann:


Could you post the complete example code (Flink job including the type 
definitions). For example, if the data sets are of type 
|DataSet|, then it will be treated as a |GenericType|. Judging 
from your pseudo code, it looks fine on the first glance.


Cheers,
Till

​

On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé 
> wrote:


Sorry, i was out for lunch. Maybe the problem is that sessionID is
a String?

public abstract class Parent{
  private Date eventDate;
  private EventType eventType;
  private String sessionId;

public Parent() { }
//GETTER & SETTER
}

public class SourceA extends Parent{
  private Boolean outboundMessage;
  private String soapMessage;

public SourceA () {
super();
 }
//GETTER & SETTER
}

public class SourceB extends Parent{
  private Integer id;
  private String username;

public SourceB () {
super();
 }
//GETTER & SETTER

}

Am 09.02.2016 um 12:06 schrieb Till Rohrmann:


Could you share the code for your types |SourceA| and |SourceB|.
It seems as if Flink does not recognize them to be POJOs because
he assigned them the |GenericType| type. Either there is
something wrong with the type extractor or your implementation
does not fulfil the requirements for POJOs, as indicated by Chiwan.

Cheers,
Till

​

On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé
> wrote:

The fields in SourceA and SourceB are private but have public
getters and setters. The classes provide an empty and public
constructor.

Am 09.02.2016 11:47 schrieb "Chiwan Park"
>:

Oh, the fields in SourceA have public getters. Does the
fields in SourceA have public setter? SourceA needs
public setter for private fields.

Regards,
Chiwan Park

> On Feb 9, 2016, at 7:45 PM, Chiwan Park
> wrote:
>
> Hi Dominique,
>
> It seems that `SourceA` is not dealt as POJO. Are all
fields in SourceA public? There are some requirements for
POJO classes [1].
>
> [1]:

https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>
> Regards,
> Chiwan Park
>
>> On Feb 9, 2016, at 7:42 PM, Dominique Rondé
> wrote:
>>
>> Hi  folks,
>>
>> i try to join two datasets containing some PoJos. Each
PoJo inherit a field "sessionId" from the parent class.
The field is private but has a public getter.
>>
>> The join is like this:
>> DataSet> joinedDataSet =
sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>
>> But the result is the following execption:
>>
>> Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: This
type (GenericType)
cannot be used as key.
>>at

org.apache.flink.api.java.operators.Keys$ExpressionKeys.(Keys.java:287)
>>at

org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>at

x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>
>> I spend some time with google around but I don't get
an idea what is wrong. I hope some of you can give me a
hint...
>>
>> Greets
>> Dominique
>>
>




-- 
Dominique Rondé | Senior Consultant


codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil:+49 (0) 172.7182592 
www.codecentric.de   |blog.codecentric.de   

Dataset filter improvement

2016-02-09 Thread Flavio Pompermaier
Hi to all,

in my program I have a Dataset that generated different types of object wrt
the incoming element.
Thus it's like a Map.
In order to type the different generated datasets I do something:

Dataset start =...

Dataset ds1 = start.filter().map(..);
Dataset ds2 = start.filter().map(..);
Dataset ds3 = start.filter().map(..);
Dataset ds4 = start.filter().map(..);

However this is very inefficient (I think because Flink needs to
materialize the entire source dataset for every slot).

It's much more efficient to group the generation of objects of the same
type. E.g.:

Dataset start =..

Dataset tmp1 = start.map(..);
Dataset tmp2 = start.map(..);
Dataset ds1 = tmp1.filter();
Dataset ds2 = tmp1.filter();
Dataset ds3 = tmp2.filter();
Dataset ds4 = tmp2.filter();

Increasing the number of slots per task manager make things worse and worse
:)
Is there a way to improve this situation? Is it possible to write a "map"
generating different type of object and then filter them by generated class
type?

Best,
Flavio


How to convert List to flink DataSet

2016-02-09 Thread subash basnet
Hello all,

I have performed a modification in KMeans code to detect outliers. I have
printed the output in the console but I am not able to write it to the file
using the given 'writeAsCsv' method.
The problem is I generate a list of tuples.
My List is:
List finalElements = new ArrayList();
Following is the datatype of the elements added to the list:
Tuple3 newElement = new Tuple3();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to
DataSet> fElements,
so that I could use
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet


Re: Simple Flink - Kafka Test

2016-02-09 Thread shotte
Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Simple Flink - Kafka Test

2016-02-09 Thread shotte
Hi,
I am new to Flink and Kafka

I am trying to read from Flink a Kafka topic and sent it back to another
Kafka topic

Here my setup:
Flink 0.10.1
Kafka 0.9

All that on a single node

I successfully wrote a Java program that send message to Kafka (topic =
demo), and I have a consumer (in a shell) that read it. so that working.

When I execute the flink program I got this error
See code and Error Below...it is something between step C & D

What I am doing wrong ?

Thanks

Sylvain


package com.sylvain;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * Skeleton for a Flink Job.
 *
 * For a full example of a Flink Job, see the WordCountJob.java file in the
 * same package/directory or have a look at the website.
 *
 * You can also generate a .jar file that you can submit on your Flink
 * cluster.
 * Just type
 *  mvn clean package
 * in the projects root directory.
 * You will find the jar in
 *  target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
 *
 */
public class Job {

public static void main(String[] args) throws Exception {
// set up the execution environment

System.out.println("Step A");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

System.out.println("Step B");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

System.out.println("Step C");


DataStream messageStream = env.addSource(new
FlinkKafkaConsumer082<>("demo", new SimpleStringSchema(), properties));

System.out.println("Step D");
messageStream.map(new MapFunction(){

@Override
public String map(String value) throws Exception {
// TODO Auto-generated method stub
return "Blablabla " +  value;
}


}).addSink(new FlinkKafkaProducer("localhost:9092", "demo2", new
SimpleStringSchema()));


System.out.println("Step E");
env.execute();
System.out.println("Step F");


}
}
 


[shotte@localhost flink-kafka]$ flink run ./target/flink-kafka-0.1.jar 
Step A
Step B
Step C
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
at kafka.utils.Pool.(Pool.scala:28)
at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
at
kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:691)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:281)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
at com.sylvain.Job.main(Job.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at
org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: java.lang.ClassNotFoundException:
scala.collection.GenTraversableOnce$class
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)

Re: Simple Flink - Kafka Test

2016-02-09 Thread Chiwan Park
Hi shotte,

The exception is caused by Scala version mismatch. If you want to use Scala 
2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a 
documentation about this in wiki [1].

I hope this helps.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Regards,
Chiwan Park

> On Feb 10, 2016, at 9:39 AM, shotte  wrote:
> 
> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink 1.0-SNAPSHOT scala 2.11 in S3 has scala 2.10

2016-02-09 Thread Chiwan Park
Hi David,

I just downloaded the "flink-1.0-SNAPSHOT-bin-hadoop2_2.11.tgz” but there is no 
jar compiled with Scala 2.10. Could you check again?

Regards,
Chiwan Park

> On Feb 10, 2016, at 2:59 AM, David Kim  
> wrote:
> 
> Hello,
> 
> I noticed that the flink binary for scala 2.11 located at 
> http://stratosphere-bin.s3.amazonaws.com/flink-1.0-SNAPSHOT-bin-hadoop2_2.11.tgz
>  contains the scala 2.10 flavor.
> 
> If you open the lib folder the name of the jar in lib is 
> flink-dist_2.10-1.0-SNAPSHOT.jar.
> 
> Could this be an error in the process that updates these files in S3?
> 
> We're using that download link following the suggestions here: 
> https://flink.apache.org/contribute-code.html#snapshots-nightly-builds. If 
> there's a better place let us know as well!
> 
> Thanks,
> David



Re: Simple Flink - Kafka Test

2016-02-09 Thread Chiwan Park
The documentation I sent is for Flink 1.0.

In Flink 0.10.x, there is no suffix of dependencies for Scala 2.10 (e.g. 
flink-streaming-java). But there is a suffix of dependencies for Scala 2.11 
(e.g. flink-streaming-java_2.11).

Regards,
Chiwan Park

> On Feb 10, 2016, at 1:46 PM, Chiwan Park  wrote:
> 
> Hi shotte,
> 
> The exception is caused by Scala version mismatch. If you want to use Scala 
> 2.11, you have to set Flink dependencies compiled for Scala 2.11. We have a 
> documentation about this in wiki [1].
> 
> I hope this helps.
> 
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
> 
> Regards,
> Chiwan Park
> 
>> On Feb 10, 2016, at 9:39 AM, shotte  wrote:
>> 
>> Do I need to go to Flink 1.0 or the downgrade to Kafka 0.8 ?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-tp4828p4829.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: Dataset filter improvement

2016-02-09 Thread Flavio Pompermaier
Any help on this?
On 9 Feb 2016 18:03, "Flavio Pompermaier"  wrote:

> Hi to all,
>
> in my program I have a Dataset that generated different types of object
> wrt the incoming element.
> Thus it's like a Map.
> In order to type the different generated datasets I do something:
>
> Dataset start =...
>
> Dataset ds1 = start.filter().map(..);
> Dataset ds2 = start.filter().map(..);
> Dataset ds3 = start.filter().map(..);
> Dataset ds4 = start.filter().map(..);
>
> However this is very inefficient (I think because Flink needs to
> materialize the entire source dataset for every slot).
>
> It's much more efficient to group the generation of objects of the same
> type. E.g.:
>
> Dataset start =..
>
> Dataset tmp1 = start.map(..);
> Dataset tmp2 = start.map(..);
> Dataset ds1 = tmp1.filter();
> Dataset ds2 = tmp1.filter();
> Dataset ds3 = tmp2.filter();
> Dataset ds4 = tmp2.filter();
>
> Increasing the number of slots per task manager make things worse and
> worse :)
> Is there a way to improve this situation? Is it possible to write a "map"
> generating different type of object and then filter them by generated class
> type?
>
> Best,
> Flavio
>
>
>
>
>
>
>


Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Dominique Rondé

Hi,

your guess is correct. I use java all the time... Here is the complete 
stacktrace:


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.

at org.apache.flink.client.program.Client.runBlocking(Client.java:367)
at org.apache.flink.client.program.Client.runBlocking(Client.java:345)
at org.apache.flink.client.program.Client.runBlocking(Client.java:312)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)

at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
at 
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN 
Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' , 
caused an error: Unsupported driver strategy for join driver: CO_GROUP_RAW

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Unsupported driver strategy for join 
driver: CO_GROUP_RAW
at 
org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
... 3 more

Am 09.02.2016 um 21:03 schrieb Fabian Hueske:

Hi,
glad you could resolve the POJO issue, but the new error doesn't look 
right.
The CO_GROUP_RAW strategy should only be used for programs that are 
implemented against the Python DataSet API.

I guess that's not the case since all code snippets were Java so far.

Can you post the full stacktrace of the exception?

2016-02-09 20:13 GMT+01:00 Dominique Rondé 
>:


Hi all,

i finally figured out that there is a getter for a boolean field
which may be the source of the trouble. It seems that
getBooleanField (as we use it) is not the best choice. Now the
plan is executed with another error code. :(

Caused by: java.lang.Exception: Unsupported driver strategy for
join driver: CO_GROUP_RAW

Is there any link to a documentation or some example code which
you  may recommend beside the offical documentation?

But folks, thanks for your greate support! A really nice community
here!

Greets
Dominique


Am 09.02.2016 um 19:41 schrieb Till Rohrmann:


I tested the |TypeExtractor| with your |SourceA| and |SourceB|
types (adding proper setters and getters) and it correctly
returned a |PojoType|. Thus, I would suspect that you haven’t
specified the proper setters and getters in your implementation.

Cheers,
Till

​

On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé
> wrote:

Here we go!

  ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
53408,"flink-job.jar");


  DataSource datasourceA=
env.readTextFile("hdfs://dev//sourceA/");
  DataSource datasourceB=
env.readTextFile("hdfs://dev//sourceB/");

  DataSet sourceA= 

Re: Join two Datasets --> InvalidProgramException

2016-02-09 Thread Fabian Hueske
Hi Dominique,

can you check if the versions of the remotely running job manager & task
managers are the same as the Flink version that is used to submit the job?
The version and commit hash are logged at the top of the JM and TM log
files.

Right now, the local client optimizes the job, chooses the execution
strategies, and sends the plan to the remote JobManager. Recently, we added
and removed some strategies. So it might be that the strategy enum of
client and jobmanager got out of sync.

Cheers, Fabian

2016-02-10 7:33 GMT+01:00 Dominique Rondé :

> Hi,
>
> your guess is correct. I use java all the time... Here is the complete
> stacktrace:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:367)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:345)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:312)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
> at
> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
> Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' , caused
> an error: Unsupported driver strategy for join driver: CO_GROUP_RAW
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Unsupported driver strategy for join
> driver: CO_GROUP_RAW
> at
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
> ... 3 more
>
>
> Am 09.02.2016 um 21:03 schrieb Fabian Hueske:
>
> Hi,
> glad you could resolve the POJO issue, but the new error doesn't look
> right.
> The CO_GROUP_RAW strategy should only be used for programs that are
> implemented against the Python DataSet API.
> I guess that's not the case since all code snippets were Java so far.
>
> Can you post the full stacktrace of the exception?
>
> 2016-02-09 20:13 GMT+01:00 Dominique Rondé  >:
>
>> Hi all,
>>
>> i finally figured out that there is a getter for a boolean field which
>> may be the source of the trouble. It seems that getBooleanField (as we use
>> it) is not the best choice. Now the plan is executed with another error
>> code. :(
>>
>> Caused by: java.lang.Exception: Unsupported driver strategy for join
>> driver: CO_GROUP_RAW
>>
>> Is there any link to a documentation or some example code which you  may
>> recommend beside the offical documentation?
>>
>> But folks, thanks for your greate support! A really nice community here!
>>
>> Greets
>> Dominique
>>
>>
>> Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
>>
>> I tested the TypeExtractor with your SourceA and SourceB types (adding
>> proper setters and getters) and it correctly returned a PojoType. Thus,
>> I 

Re: How to convert List to flink DataSet

2016-02-09 Thread Stefano Baghino
Assuming your EnvironmentContext is named `env` Simply call:

DataSet> fElements = env.*fromCollection*
(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet  wrote:

> Hello all,
>
> I have performed a modification in KMeans code to detect outliers. I have
> printed the output in the console but I am not able to write it to the file
> using the given 'writeAsCsv' method.
> The problem is I generate a list of tuples.
> My List is:
> List finalElements = new ArrayList();
> Following is the datatype of the elements added to the list:
> Tuple3 newElement = new Tuple3 Boolean>();
> finalElements.add(newElement);
> Now I am stuck on how to convert this 'finalElements' to
> DataSet> fElements,
> so that I could use
> fElements.writeAsCsv(outputPath, "\n"," ");
>
> Best Regards,
> Subash Basnet
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit