Re: How can handles Exist ,not Exist query on flink

2015-07-10 Thread hagersaleh
I want example on use join or co group for handles Exists or not Exists



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-handles-Exist-not-Exist-query-on-flink-tp1939p2006.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: how can handles Any , All query on flink

2015-07-10 Thread hagersaleh
please help



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-can-handles-Any-All-query-on-flink-tp1997p2005.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: DelimitedInputFormat reads entire buffer when splitLength is 0

2015-07-10 Thread Stephan Ewen
Hi Robert!

This clearly sounds like unintended behavior. Thanks for reporting this.

Apparently, the 0 line length was supposed to have a double meaning, but it
goes haywire in this case.

Let me try to come with a fix for this...

Greetings,
Stephan


On Fri, Jul 10, 2015 at 6:05 PM, Robert Schmidtke 
wrote:

> Hey everyone,
>
> I just noticed that when processing input splits from a
> DelimitedInputFormat (specifically, I have a text file with words in it),
> that if the splitLength is 0, the entire readbuffer is filled (see
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java#L577).
> I'm using XtreemFS as underlying file system, which stripes files in blocks
> of 128kb across storage servers. I have 8 physically separate nodes, and my
> input file is 1MB, such that each node stores 128kb of data. This is
> reported accurately to Flink (e.g. split sizes and hostnames). Now when the
> splitLength is 0 at some point during processing (which it will become
> eventually), the entire file is read in again, which kind of defeats the
> point of processing a split of length 0. Is this intended behavior? I've
> tried multiple hot-fixes, but they ended up in the file not bein read in
> its entirety. I would like to know the rationale behind this
> implementation, and maybe figure out a way around it. Thanks in advance,
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>


DelimitedInputFormat reads entire buffer when splitLength is 0

2015-07-10 Thread Robert Schmidtke
Hey everyone,

I just noticed that when processing input splits from a
DelimitedInputFormat (specifically, I have a text file with words in it),
that if the splitLength is 0, the entire readbuffer is filled (see
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java#L577).
I'm using XtreemFS as underlying file system, which stripes files in blocks
of 128kb across storage servers. I have 8 physically separate nodes, and my
input file is 1MB, such that each node stores 128kb of data. This is
reported accurately to Flink (e.g. split sizes and hostnames). Now when the
splitLength is 0 at some point during processing (which it will become
eventually), the entire file is read in again, which kind of defeats the
point of processing a split of length 0. Is this intended behavior? I've
tried multiple hot-fixes, but they ended up in the file not bein read in
its entirety. I would like to know the rationale behind this
implementation, and maybe figure out a way around it. Thanks in advance,

Robert

-- 
My GPG Key ID: 336E2680


Multiple ElasticSearch sinks

2015-07-10 Thread Flavio Pompermaier
Hi to all,

I have a Flink job that produce json objects that I'd like to index in
different Elasticsearch indices depending on the "type" attribute of my
json object (e.g. "people", "places", etc..).
Is there any previous attempt to do something like that in Flink?
I was thinking to use the EsHadoopOutputFormat but it requires to specify
the index name in the job conf..however, in my use case I'll know the
target indices only once the computation finish so Flink can't know how
many sinks there will be in the pre-flight phase..

My solution at the moment was to implement my own mapPartition function
that instantiate a client to ES and index the json documents in the right
index at the end of the job pipeline..is there any better approach to it?

Best,
Flavio


Re: TeraSort on Flink and Spark

2015-07-10 Thread Stephan Ewen
Hi Dongwon Kim!

Thank you for trying out these changes.

The OptimizedText can be sorted more efficiently, because it generates a
binary key prefix. That way, the sorting needs to serialize/deserialize
less and saves on CPU.

In parts of the program, the CPU is then less of a bottleneck and the disks
and the network can unfold their bandwidth better.

Greetings,
Stephan



On Fri, Jul 10, 2015 at 9:35 AM, Dongwon Kim 
wrote:

> Hi Stephan,
>
> I just pushed changes to my github:
> https://github.com/eastcirclek/terasort.
> I've modified the TeraSort program so that (A) it can reuse objects
> and (B) it can exploit OptimizedText as you suggested.
>
> I've also conducted few experiments and the results are as follows:
> ORIGINAL : 1714
> ORIGINAL+A : 1671
> ORIGINAL+B : 1467
> ORIGINAL+A+B : 1427
> Your advice works as shown above :-)
>
> Datasets are now defined as below:
> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
> classOf[Text], inputPath)
> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1),
> tp._2))
> - val sortedPartitioned = optimizedText.partitionCustom(partitioner,
> 0).sortPartition(0, Order.ASCENDING)
> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
> You can see the two map transformations before and after the function
> composition partitionCustom.sortPartition.
>
> Here is a question regarding the performance improvement.
> Please see the attached Ganglia image files.
> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
> Compared to ORIGINAL, BEST shows better utilization of disks and
> network and shows lower CPU utilization.
> Is this because OptimizedText objects are serialized into Flink memory
> layout?
> What happens when keys are represented in just Text, not
> OptimziedText? Are there another memory area to hold such objects? or
> are they serialized anyway but in an inefficient way?
> If latter, is the CPU utilization in ORIGINAL high because CPUs work
> hard to serialize Text objects using Java serialization mechanism with
> DataInput and DataOutput?
> If true, I can explain the high throughput of network and disks in
> ORIGINAL+A+B.
> I, however, observed the similar performance when I do mapping not
> only on 10-byte keys but also on 90-byte values, which cannot be
> explained by the above conjecture.
> Could you make things clear? If so, I would be very appreciated ;-)
>
> I'm also wondering whether the two map transformations,
> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
> (Text, Text),
> can prevent Flink from performing a lot better.
> I don't have time to modify TeraInputFormat and TeraOutputFormat to
> read (String, String) pairs from HDFS and write (String, String) pairs
> to HDFS.
> Do you see that one can get a better TeraSort result using an new
> implementation of FileInputFormat?
>
> Regards,
>
> Dongwon Kim
>
> 2015-07-03 3:29 GMT+09:00 Stephan Ewen :
> > Hello Dongwon Kim!
> >
> > Thanks you for sharing these numbers with us.
> >
> > I have gone through your implementation and there are two things you
> could
> > try:
> >
> > 1)
> >
> > I see that you sort Hadoop's Text data type with Flink. I think this may
> be
> > less efficient than if you sort String, or a Flink specific data type.
> >
> > For efficient byte operations on managed memory, Flink needs to
> understand
> > the binary representation of the data type. Flink understands that for
> > "String" and many other types, but not for "Text".
> >
> > There are two things you can do:
> >   - First, try what happens if you map the Hadoop Text type to a Java
> String
> > (only for the tera key).
> >   - Second, you can try what happens if you wrap the Hadoop Text type in
> a
> > Flink type that supports optimized binary sorting. I have pasted code for
> > that at the bottom of this email.
> >
> > 2)
> >
> > You can see if it helps performance if you enable object re-use in Flink.
> > You can do this on the ExecutionEnvironment via
> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> > objects repeatedly, in case they are mutable.
> >
> >
> > Can you try these options out and see how they affect Flink's runtime?
> >
> >
> > Greetings,
> > Stephan
> >
> > -
> > Code for optimized sortable (Java):
> >
> > public final class OptimizedText implements
> NormalizableKey
> > {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
> > public int getMaxNormalizedKeyLen() {
> > return 10;
> > }
> >
> > @Override
> > public void copyNormalizedKey(MemorySegment memory, int offset, int len)
> {
> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> > Math.min(10, len)));
> > }
> >
> > @Override
> > publi

Re: TeraSort on Flink and Spark

2015-07-10 Thread Fabian Hueske
Hi Dongwon Kim,

this blog post describes Flink's memory management, serialization, and sort
algorithm and also includes performance numbers of some microbenchmarks.

-->
http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

The difference between Text and OptimizedText, is that OptimizedText is
sorted using 10-byte binary prefix key. Hence, the sorting happens directly
on the binary data and OptimizedText objects are not deserialized.
 The lower CPU utilization can be explained by less deserialization +
garbage collection. Since the CPU is less utilized, the network and disk
utilization increases.

Let us know if you have further questions,
Fabian

2015-07-10 9:35 GMT+02:00 Dongwon Kim :

> Hi Stephan,
>
> I just pushed changes to my github:
> https://github.com/eastcirclek/terasort.
> I've modified the TeraSort program so that (A) it can reuse objects
> and (B) it can exploit OptimizedText as you suggested.
>
> I've also conducted few experiments and the results are as follows:
> ORIGINAL : 1714
> ORIGINAL+A : 1671
> ORIGINAL+B : 1467
> ORIGINAL+A+B : 1427
> Your advice works as shown above :-)
>
> Datasets are now defined as below:
> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
> classOf[Text], inputPath)
> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1),
> tp._2))
> - val sortedPartitioned = optimizedText.partitionCustom(partitioner,
> 0).sortPartition(0, Order.ASCENDING)
> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
> You can see the two map transformations before and after the function
> composition partitionCustom.sortPartition.
>
> Here is a question regarding the performance improvement.
> Please see the attached Ganglia image files.
> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
> Compared to ORIGINAL, BEST shows better utilization of disks and
> network and shows lower CPU utilization.
> Is this because OptimizedText objects are serialized into Flink memory
> layout?
> What happens when keys are represented in just Text, not
> OptimziedText? Are there another memory area to hold such objects? or
> are they serialized anyway but in an inefficient way?
> If latter, is the CPU utilization in ORIGINAL high because CPUs work
> hard to serialize Text objects using Java serialization mechanism with
> DataInput and DataOutput?
> If true, I can explain the high throughput of network and disks in
> ORIGINAL+A+B.
> I, however, observed the similar performance when I do mapping not
> only on 10-byte keys but also on 90-byte values, which cannot be
> explained by the above conjecture.
> Could you make things clear? If so, I would be very appreciated ;-)
>
> I'm also wondering whether the two map transformations,
> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
> (Text, Text),
> can prevent Flink from performing a lot better.
> I don't have time to modify TeraInputFormat and TeraOutputFormat to
> read (String, String) pairs from HDFS and write (String, String) pairs
> to HDFS.
> Do you see that one can get a better TeraSort result using an new
> implementation of FileInputFormat?
>
> Regards,
>
> Dongwon Kim
>
> 2015-07-03 3:29 GMT+09:00 Stephan Ewen :
> > Hello Dongwon Kim!
> >
> > Thanks you for sharing these numbers with us.
> >
> > I have gone through your implementation and there are two things you
> could
> > try:
> >
> > 1)
> >
> > I see that you sort Hadoop's Text data type with Flink. I think this may
> be
> > less efficient than if you sort String, or a Flink specific data type.
> >
> > For efficient byte operations on managed memory, Flink needs to
> understand
> > the binary representation of the data type. Flink understands that for
> > "String" and many other types, but not for "Text".
> >
> > There are two things you can do:
> >   - First, try what happens if you map the Hadoop Text type to a Java
> String
> > (only for the tera key).
> >   - Second, you can try what happens if you wrap the Hadoop Text type in
> a
> > Flink type that supports optimized binary sorting. I have pasted code for
> > that at the bottom of this email.
> >
> > 2)
> >
> > You can see if it helps performance if you enable object re-use in Flink.
> > You can do this on the ExecutionEnvironment via
> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> > objects repeatedly, in case they are mutable.
> >
> >
> > Can you try these options out and see how they affect Flink's runtime?
> >
> >
> > Greetings,
> > Stephan
> >
> > -
> > Code for optimized sortable (Java):
> >
> > public final class OptimizedText implements
> NormalizableKey
> > {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
>