Re: S3 Access in eu-central-1
Hey Stephan, Hey Steve, that was the right hint, adding that open to the Java-Options fixed the problem. Maybe we should add this somehow to our Flink Wiki? Thanks! Dominik On 28/11/17 11:55, Stephan Ewen wrote: Got a pointer from Steve that this is answered on Stack Overflow here: https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version <https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version> Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no footprint, compatible across Hadoop versions, and based on a later s3a and AWS sdk. In that connector, it should work out of the box because it uses a later AWS SDK. You can also use it with earlier Hadoop versions because dependencies are relocated, so it should not cash/conflict. On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen <mailto:se...@apache.org>> wrote: Hi! The endpoint config entry looks correct. I was looking at this issue to see if there are pointers to anything else, but it looks like the explicit endpoint entry is the most important thing: https://issues.apache.org/jira/browse/HADOOP-13324 <https://issues.apache.org/jira/browse/HADOOP-13324> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for pulling you in again - listening and learning still about the subtle bits and pieces of S3). @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or only in Hadoop 2.8? Best, Stephan On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn mailto:domi...@dbruhn.de>> wrote: Hey, can anyone give a hint? Does anyone have flink running with an S3 Bucket in Frankfurt/eu-central-1 and can share his config and setup? Thanks, Dominik On 22. Nov 2017, at 17:52, domi...@dbruhn.de <mailto:domi...@dbruhn.de> wrote: Hey everyone, I'm trying since hours to get Flink 1.3.2 (downloaded for hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is hosted in the eu-central-1 region. Everything works fine for other regions. I'm running my job on a JobTracker in local mode. I googled the internet and found several hints, most of them telling that setting the `fs.s3a.endpoint` should solve it. It doesn't. I'm also sure that the core-site.xml (see below) is picked up, if I put garbage into the endpoint then I receive a hostname not found error. The exception I'm getting is: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ= I read the AWS FAQ but I don't think that https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request> applies to me as I'm not running the NativeFileSystem. I suspect this is related to the v4 signing protocol which is required for S3 in Frankfurt. Could it be that the aws-sdk version is just too old? I tried to play around with it but the hadoop adapter is incompatible with newer versions. I have the following core-site.xml: fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.buffer.dir/tmp fs.s3a.access.keysomething fs.s3a.secret.keywont-tell fs.s3a.endpoints3.eu-central-1.amazonaws.com <http://s3.eu-central-1.amazonaws.com>
Re: S3 Access in eu-central-1
Hey, can anyone give a hint? Does anyone have flink running with an S3 Bucket in Frankfurt/eu-central-1 and can share his config and setup? Thanks, Dominik > On 22. Nov 2017, at 17:52, domi...@dbruhn.de wrote: > > Hey everyone, > I'm trying since hours to get Flink 1.3.2 (downloaded for hadoop 2.7) to > snapshot/checkpoint to an S3 bucket which is hosted in the eu-central-1 > region. Everything works fine for other regions. I'm running my job on a > JobTracker in local mode. I googled the internet and found several hints, > most of them telling that setting the `fs.s3a.endpoint` should solve it. It > doesn't. I'm also sure that the core-site.xml (see below) is picked up, if I > put garbage into the endpoint then I receive a hostname not found error. > > The exception I'm getting is: > com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS > Service: Amazon S3, AWS Request ID: 432415098B0994BC, AWS Error Code: null, > AWS Error Message: Bad Request, S3 Extended Request ID: > 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ= > > I read the AWS FAQ but I don't think that > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request > applies to me as I'm not running the NativeFileSystem. > > I suspect this is related to the v4 signing protocol which is required for S3 > in Frankfurt. Could it be that the aws-sdk version is just too old? I tried > to play around with it but the hadoop adapter is incompatible with newer > versions. > > I have the following core-site.xml: > > > > > fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem > fs.s3a.buffer.dir/tmp > fs.s3a.access.keysomething > fs.s3a.secret.keywont-tell > > fs.s3a.endpoints3.eu-central-1.amazonaws.com > > Here is my lib folder with the versions of the aws-sdk and the hadoop-aws > integration: > -rw---1 root root 11.4M Mar 20 2014 aws-java-sdk-1.7.4.jar > -rw-r--r--1 1005 1006 70.0M Aug 3 12:10 > flink-dist_2.11-1.3.2.jar > -rw-rw-r--1 1005 1006 98.3K Aug 3 12:07 > flink-python_2.11-1.3.2.jar > -rw-r--r--1 1005 1006 34.9M Aug 3 11:58 > flink-shaded-hadoop2-uber-1.3.2.jar > -rw---1 root root 100.7K Jan 14 2016 hadoop-aws-2.7.2.jar > -rw---1 root root 414.7K May 17 2012 httpclient-4.2.jar > -rw---1 root root 218.0K May 1 2012 httpcore-4.2.jar > -rw-rw-r--1 1005 1006 478.4K Jul 28 14:50 log4j-1.2.17.jar > -rw-rw-r--1 1005 10068.7K Jul 28 14:50 slf4j-log4j12-1.7.7.jar > > Can anyone give me any hints? > > Thanks, > Dominik
Set Parallelism and keyBy
Hey, I have a flink job which has a default parallelism set to 2. I want to key the stream and then apply some flatMap on the keyed stream. The flatMap operation is quiet costly, so I want to have a much higher parallelism here (lets say 16). Additionally, it is important that the flatMap operation is executed for the same key always in the same process or in the same task. I have the following code: env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print() This works fine, and the "ExpensiveOperation" is executed always on the same tasks for the same keys. Now I tried two things: 1. env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).setParallelism(16).flatMap(new ExpensiveOperation()).print() This fails with an exception because I can't set the parallelism on the keyBy operator. 2. - env.setParallelism(2) val input: DataStream[Event] = /* from somewhere */ input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParallelism(16).print() - While this executes, it breaks the assignment of the keys to the tasks: The "ExpensiveOperation" is now not executed on the same nodes anymore all the time (visible by the prefixes in the print()). What am I doing wrong? Is the only chance to set the whole parallelism of the whole flink job to 16? Thanks, have nice holidays, Dominik
Re: Who's hiring, December 2016
Relayr, a Berlin based IoT company is using Apache Flink for processing its sensor data. We are still in the learning phase, but are commited to using Flink. Check out our public job ads for Berlin and Munich: https://relayr.io/jobs/ If you are interested, even if there is no clearly matching job ad, feel free to ping me, Dominik On 16.12.2016 14:46, Kostas Tzoumas wrote: Hi folks, As promised, here is the first thread for Flink-related job positions. If your organization is hiring people on Flink-related positions do reply to this thread with a link for applications. data Artisans is hiring on multiple technical positions. Help us build Flink, and help our customers be successful in their Flink projects: - Senior distributed systems engineer: https://data-artisans.workable.com/jobs/396284 - Software engineer (Java/Scala and/or Python): https://data-artisans.workable.com/jobs/396286 - QA/DevOps engineer: https://data-artisans.workable.com/jobs/396288 - UI/UX engineer: https://data-artisans.workable.com/jobs/396287 - Senior data engineer (EU and USA): https://data-artisans.workable.com/jobs/325667 Best regards, Kostas PS: As mentioned in the original DISCUSS thread, I am cc'ing the dev and user lists in the first few emails to remind folks to subscribe to the new commun...@flink.apache.org mailing list Instructions to subscribe are here: http://flink.apache.org/community.html#mailing-lists
Release Process
Hey everyone, about three month ago, I made a PR [1] to the flink github project containing a small change for the RabbitMQ source. This PR was merged and the code is in the master. But: This code never made it into a release. In JIRA [2], it is meant to be released with 1.2. How is the policy here? When can I expect to see this in the a official release? How are the rules here? Thanks for a clarification, Dominik [1]: https://github.com/apache/flink/pull/2373 [2]: https://issues.apache.org/jira/browse/FLINK-4394
BoundedOutOfOrdernessTimestampExtractor and timestamps in the future
Hey, I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my timestamps and discarding to old events (which happens sometimes). Now my problem is that some events, by accident have timestamps in the future. If the timestamps are more in the future than my `maxOutOfOrderness`, I'm discarding valid events. So I need a way of saying that the BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from the future for the watermark calculation. I still want to keep the events if they are in the future and assign them to the right watermarks. How can I achieve this? I thought about checking whether the potential timestamp is in the future before considering it for a watermark. I cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7 Does this make sense? Or is there a better approach? In general, how does Flink handle readings from the future? Thanks, Dominik -- Dominik
TimeWindow Trigger which only fires when the values have changed
Hi, I'm heavily relying on TimeWindows for my real time processing. Roughly my job consumes from an AMQP queue, computes some time buckets and saves the time-buckets to Cassandra. I found the EventTimeTriggerWithEarlyAndLateFiring [1] class which already helped me a lot: Even with long time-windows, I can get intermediate values already saved to Cassandra by using the earlyFiring (and setting "accumulating" to true. My question is: Would it be possible to only write fire the trigger if the value of the TimeBucket has changed? What I actually want is only writing to Cassandra if there is actually something different in the time bucket. And, as a side question: Why is something like the EventTimeTriggerWithEarlyAndLateFiring not in the default Flink distribution? It seems very handy. [1]: https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java Thanks, Dominik