Re: problem with union
Hi everybody, I am still waiting for version 0.9.1 to solve this problem, any idea on when it will be released? Thanks Best, michele Il giorno 15/lug/2015, alle ore 15:58, Maximilian Michels m...@apache.orgmailto:m...@apache.org ha scritto: I was able to reproduce this problem. It turns out, this has already been fixed in the snapshot version: https://issues.apache.org/jira/browse/FLINK-2229 The fix will be included in the upcoming 0.9.1 release. Thank you again for reporting! Kind regards, Max On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.orgmailto:m...@apache.org wrote: Hi Michele, Thanks for reporting the problem. It seems like we changed the way we compare generic types like your GValue type. I'm debugging that now. We can get a fix in for the 0.9.1 release. Cheers, Max On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni michele1.bert...@mail.polimi.itmailto:michele1.bert...@mail.polimi.it wrote: Hi everybody, this discussion started in an other thread about a problem in union, but you said it was a different error then i am opening a new topic I am doing the union of two dataset and I am getting this error Exception in thread main org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue), input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) at org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46) at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101) at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125) ... Input1= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) input2= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) as you can see the two datasets have the same type this error only happens with a custom data type (e.g. i am using an array of GValue, an array of Int or Double works) in the last flink version it was working (milestone and snapshot) now in 0.9.0 it is not what can it be? thanks for help cheers, Michele
Re: problem with union
Hi Michele, We’re doing release process for 0.9.1. Ufuk Celebi will start vote for 0.9.1 release soon. Regards, Chiwan Park On Aug 27, 2015, at 6:49 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, I am still waiting for version 0.9.1 to solve this problem, any idea on when it will be released? Thanks Best, michele Il giorno 15/lug/2015, alle ore 15:58, Maximilian Michels m...@apache.org ha scritto: I was able to reproduce this problem. It turns out, this has already been fixed in the snapshot version: https://issues.apache.org/jira/browse/FLINK-2229 The fix will be included in the upcoming 0.9.1 release. Thank you again for reporting! Kind regards, Max On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.org wrote: Hi Michele, Thanks for reporting the problem. It seems like we changed the way we compare generic types like your GValue type. I'm debugging that now. We can get a fix in for the 0.9.1 release. Cheers, Max On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, this discussion started in an other thread about a problem in union, but you said it was a different error then i am opening a new topic I am doing the union of two dataset and I am getting this error Exception in thread main org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue), input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) at org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46) at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101) at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125) ... Input1= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) input2= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) as you can see the two datasets have the same type this error only happens with a custom data type (e.g. i am using an array of GValue, an array of Int or Double works) in the last flink version it was working (milestone and snapshot) now in 0.9.0 it is not what can it be? thanks for help cheers, Michele
Re: problem with union
I think this commit fixed it in the 0.9 branch (c7e868416a5b8f61489a221ad3822dea1366d887) so it should be good in the release. On Thu, Aug 27, 2015 at 11:52 AM, Chiwan Park chiwanp...@apache.org wrote: Hi Michele, We’re doing release process for 0.9.1. Ufuk Celebi will start vote for 0.9.1 release soon. Regards, Chiwan Park On Aug 27, 2015, at 6:49 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, I am still waiting for version 0.9.1 to solve this problem, any idea on when it will be released? Thanks Best, michele Il giorno 15/lug/2015, alle ore 15:58, Maximilian Michels m...@apache.org ha scritto: I was able to reproduce this problem. It turns out, this has already been fixed in the snapshot version: https://issues.apache.org/jira/browse/FLINK-2229 The fix will be included in the upcoming 0.9.1 release. Thank you again for reporting! Kind regards, Max On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.org wrote: Hi Michele, Thanks for reporting the problem. It seems like we changed the way we compare generic types like your GValue type. I'm debugging that now. We can get a fix in for the 0.9.1 release. Cheers, Max On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, this discussion started in an other thread about a problem in union, but you said it was a different error then i am opening a new topic I am doing the union of two dataset and I am getting this error Exception in thread main org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue), input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) at org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46) at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101) at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125) ... Input1= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) input2= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) as you can see the two datasets have the same type this error only happens with a custom data type (e.g. i am using an array of GValue, an array of Int or Double works) in the last flink version it was working (milestone and snapshot) now in 0.9.0 it is not what can it be? thanks for help cheers, Michele
About exactly once question?
Hi: The document said Flink can guarantee processing each tuple exactly-once, but I can not understand how it works. For example, In Fig 1, C is running between snapshot n-1 and snapshot n(snapshot n hasn't been generated). After snapshot n-1, C has processed tuple x1, x2, x3 and already outputted to user, then C failed and it recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed and outputted to user again. My question is how Flink guarantee x1,x2,x3 are processed and outputted to user only once? [cid:image001.png@01D0E0F6.B3DCC0F0] Fig 1. Thanks for answing.
Re: About exactly once question?
Note that the definition of exactly-once means that records are guaranteed to be processed exactly once by Flink operators, and thus state updates to operator state happen exactly once (e.g., if C had a counter that x1, x2, and x3 incremented, the counter would have a value of 3 and not a value of 6). This is not specific to Flink, but the most accepted definition, and applicable to all stream processing systems. The reason is that the stream processor cannot by itself guarantee what happens to the outside world (the outside world is in this case the data sink). See the docs ( https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html ): Apache Flink offers a fault tolerance mechanism to consistently recover the state of data streaming applications. The mechanism ensures that even in the presence of failures, the program’s state will eventually reflect every record from the data stream exactly once. Guaranteeing exactly once delivery to the sink is possible, as Marton above suggests, but the sink implementation needs to be aware and take part in the checkpointing mechanism. On Thu, Aug 27, 2015 at 1:14 PM, Márton Balassi balassi.mar...@gmail.com wrote: Dear Zhangrucong, From your explanation it seems that you have a good general understanding of Flink's checkpointing algorithm. Your concern is valid, by default a sink C with emits tuples to the outside world potentially multiple times. A neat trick to solve this issue for your user defined sinks is to use the CheckpointNotifier interface to output records only after the corresponding checkpoint has been totally processed by the system, so sinks can also provid exactly once guarantees in Flink. This would mean that your SinkFunction has to implement both the Checkpointed and the CheckpointNotifier interfaces. The idea is to mark the output tuples with the correspoding checkpoint id, so then they can be emitted in a consistent manner when the checkpoint is globally acknowledged by the system. You buffer your output records in a collection of your choice and whenever a snapshotState of the Checkpointed interface is invoked you mark your fresh output records with the current checkpointID. Whenever the notifyCheckpointComplete is invoked you emit records with the corresponding ID. Note that this adds latency to your processing and as you potentially need to checkpoint a lot of data in the sinks I would recommend to use a HDFS as a state backend instead of the default solution. Best, Marton On Thu, Aug 27, 2015 at 12:32 PM, Zhangrucong zhangruc...@huawei.com wrote: Hi: The document said Flink can guarantee processing each tuple exactly-once, but I can not understand how it works. For example, In Fig 1, C is running between snapshot n-1 and snapshot n(snapshot n hasn’t been generated). After snapshot n-1, C has processed tuple x1, x2, x3 and already outputted to user, then C failed and it recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed and outputted to user again. My question is how Flink guarantee x1,x2,x3 are processed and outputted to user only once? Fig 1. Thanks for answing.
Re: About exactly once question?
Oops, seems that Stephan's email covers my answer plus the plans to provide transactional sinks :-) On Thu, Aug 27, 2015 at 1:25 PM, Kostas Tzoumas ktzou...@apache.org wrote: Note that the definition of exactly-once means that records are guaranteed to be processed exactly once by Flink operators, and thus state updates to operator state happen exactly once (e.g., if C had a counter that x1, x2, and x3 incremented, the counter would have a value of 3 and not a value of 6). This is not specific to Flink, but the most accepted definition, and applicable to all stream processing systems. The reason is that the stream processor cannot by itself guarantee what happens to the outside world (the outside world is in this case the data sink). See the docs ( https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html ): Apache Flink offers a fault tolerance mechanism to consistently recover the state of data streaming applications. The mechanism ensures that even in the presence of failures, the program’s state will eventually reflect every record from the data stream exactly once. Guaranteeing exactly once delivery to the sink is possible, as Marton above suggests, but the sink implementation needs to be aware and take part in the checkpointing mechanism. On Thu, Aug 27, 2015 at 1:14 PM, Márton Balassi balassi.mar...@gmail.com wrote: Dear Zhangrucong, From your explanation it seems that you have a good general understanding of Flink's checkpointing algorithm. Your concern is valid, by default a sink C with emits tuples to the outside world potentially multiple times. A neat trick to solve this issue for your user defined sinks is to use the CheckpointNotifier interface to output records only after the corresponding checkpoint has been totally processed by the system, so sinks can also provid exactly once guarantees in Flink. This would mean that your SinkFunction has to implement both the Checkpointed and the CheckpointNotifier interfaces. The idea is to mark the output tuples with the correspoding checkpoint id, so then they can be emitted in a consistent manner when the checkpoint is globally acknowledged by the system. You buffer your output records in a collection of your choice and whenever a snapshotState of the Checkpointed interface is invoked you mark your fresh output records with the current checkpointID. Whenever the notifyCheckpointComplete is invoked you emit records with the corresponding ID. Note that this adds latency to your processing and as you potentially need to checkpoint a lot of data in the sinks I would recommend to use a HDFS as a state backend instead of the default solution. Best, Marton On Thu, Aug 27, 2015 at 12:32 PM, Zhangrucong zhangruc...@huawei.com wrote: Hi: The document said Flink can guarantee processing each tuple exactly-once, but I can not understand how it works. For example, In Fig 1, C is running between snapshot n-1 and snapshot n(snapshot n hasn’t been generated). After snapshot n-1, C has processed tuple x1, x2, x3 and already outputted to user, then C failed and it recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed and outputted to user again. My question is how Flink guarantee x1,x2,x3 are processed and outputted to user only once? Fig 1. Thanks for answing.
Re: problem with union
Release vote just started. If everything works, the release should be out on Monday. If you like, you can use the release candidate version and contribute to the release testing. ;) Add this to your POM: repository idrc0/id nameFlink 0.9.1 RC0/name url https://repository.apache.org/content/repositories/orgapacheflink-1043/url /repository Then you can use 0.9.1 as Flink version. If you do this, feel free to post to the dev list vote thread. – Ufuk On Thu, Aug 27, 2015 at 12:09 PM, Stephan Ewen se...@apache.org wrote: I think this commit fixed it in the 0.9 branch (c7e868416a5b8f61489a221ad3822dea1366d887) so it should be good in the release. On Thu, Aug 27, 2015 at 11:52 AM, Chiwan Park chiwanp...@apache.org wrote: Hi Michele, We’re doing release process for 0.9.1. Ufuk Celebi will start vote for 0.9.1 release soon. Regards, Chiwan Park On Aug 27, 2015, at 6:49 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, I am still waiting for version 0.9.1 to solve this problem, any idea on when it will be released? Thanks Best, michele Il giorno 15/lug/2015, alle ore 15:58, Maximilian Michels m...@apache.org ha scritto: I was able to reproduce this problem. It turns out, this has already been fixed in the snapshot version: https://issues.apache.org/jira/browse/FLINK-2229 The fix will be included in the upcoming 0.9.1 release. Thank you again for reporting! Kind regards, Max On Wed, Jul 15, 2015 at 11:33 AM, Maximilian Michels m...@apache.org wrote: Hi Michele, Thanks for reporting the problem. It seems like we changed the way we compare generic types like your GValue type. I'm debugging that now. We can get a fix in for the 0.9.1 release. Cheers, Max On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni michele1.bert...@mail.polimi.it wrote: Hi everybody, this discussion started in an other thread about a problem in union, but you said it was a different error then i am opening a new topic I am doing the union of two dataset and I am getting this error Exception in thread main org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue), input2=scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) at org.apache.flink.api.java.operators.UnionOperator.init(UnionOperator.java:46) at org.apache.flink.api.scala.DataSet.union(DataSet.scala:1101) at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricCover2$.apply(GenometricCover2.scala:125) ... Input1= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) input2= scala.Tuple6(_1: Long, _2: String, _3: Long, _4: Long, _5: Character, _6: ObjectArrayTypeInfoGenericTypeit.polimi.genomics.core.DataTypes.GValue) as you can see the two datasets have the same type this error only happens with a custom data type (e.g. i am using an array of GValue, an array of Int or Double works) in the last flink version it was working (milestone and snapshot) now in 0.9.0 it is not what can it be? thanks for help cheers, Michele
Re: New contributor tasks
Matthias has a very good point! Have a look at the System and see what strikes you as most interesting. For example - runtime - Graph Algorithms - ML algorithms - streaming core/connectors - Storm streaming layer. On Thu, Aug 27, 2015 at 10:37 AM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: One more thing. Not every open issue is documented in JIRA (even if you try to do this). You can also have a look into the wiki: https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home So if you are interested to work on a specific component you might try to talk to the main contributers about undocumented issues you can work on within this component. Just ask on the dev list. Welcome to the community! -Matthias On 08/27/2015 07:13 AM, Chiwan Park wrote: Additionally, If you have any questions about contributing, please send a mail to dev mailing list. Regards, Chiwan Park On Aug 27, 2015, at 2:11 PM, Chiwan Park chiwanp...@apache.org wrote: Hi Naveen, There is a guide document [1] about contribution in homepage. Please read first before contributing. Maybe a document of coding guidelines [2] would be helpful to you. You can find some issues [3] to start contributing to Flink in JIRA. The issues are labeled as `starter`, `newbie`, or `easyfix`. Happy contributing! Regards, Chiwan Park [1] http://flink.apache.org/how-to-contribute.html [2] http://flink.apache.org/coding-guidelines.html [3] https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20starter%20ORDER%20BY%20priority%20DESC On Aug 27, 2015, at 2:06 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi, I've setup Flink on my local linux machine and ran few examples as well. Also setup the Intellij IDE for the coding environment. Can anyone please let me know if there are any beginner tasks which I can take a look for contributing to Apache Flink codebase. I am comfortable in Java and Scala programming. Please let me know. Thanks, Naveen
RE: HadoopDataOutputStream maybe does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream
Hi, Ok, I’ve created FLINK-2580 to track this issue (and FLINK-2579, which is totally unrelated). I think I’m going to set up my dev environment to start contributing a little more than just complaining ☺. Best regards, Arnaud De : ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] De la part de Stephan Ewen Envoyé : mercredi 26 août 2015 20:12 À : user@flink.apache.org Objet : Re: HadoopDataOutputStream maybe does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream I think that is a very good idea. Originally, we wrapped the Hadoop FS classes for convenience (they were changing, we wanted to keep the system independent of Hadoop), but these are no longer relevant reasons, in my opinion. Let's start with your proposal and see if we can actually get rid of the wrapping in a way that is friendly to existing users. Would you open an issue for this? Greetings, Stephan On Wed, Aug 26, 2015 at 6:23 PM, LINZ, Arnaud al...@bouyguestelecom.frmailto:al...@bouyguestelecom.fr wrote: Hi, I’ve noticed that when you use org.apache.flink.core.fs.FileSystem to write into a hdfs file, calling org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(), it returns a HadoopDataOutputStream that wraps a org.apache.hadoop.fs.FSDataOutputStream (under its org.apache.hadoop.hdfs.client .HdfsDataOutputStream wrappper). However, FSDataOutputStream exposes many methods like flush, getPos etc, but HadoopDataOutputStream only wraps write close. For instance, flush() calls the default, empty implementation of OutputStream instead of the hadoop one, and that’s confusing. Moreover, because of the restrictive OutputStream interface, hsync() and hflush() are not exposed to Flink ; maybe having a getWrappedStream() would be convenient. (For now, that prevents me from using Flink FileSystem object, I directly use hadoop’s one). Regards, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Re: HadoopDataOutputStream maybe does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream
On 27 Aug 2015, at 09:33, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hi, Ok, I’ve created FLINK-2580 to track this issue (and FLINK-2579, which is totally unrelated). Thanks :) I think I’m going to set up my dev environment to start contributing a little more than just complaining J. If you need any help with the setup, let us know. There is also this guide: https://ci.apache.org/projects/flink/flink-docs-master/internals/ide_setup.html – Ufuk