Re: S3 Access in eu-central-1

2017-11-28 Thread Dominik Bruhn

Hey Stephan, Hey Steve,
that was the right hint, adding that open to the Java-Options fixed the 
problem. Maybe we should add this somehow to our Flink Wiki?


Thanks!

Dominik

On 28/11/17 11:55, Stephan Ewen wrote:
Got a pointer from Steve that this is answered on Stack Overflow here: 
https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version 
<https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version>


Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no 
footprint, compatible across Hadoop versions, and based on a later s3a 
and AWS sdk. In that connector, it should work out of the box because it 
uses a later AWS SDK. You can also use it with earlier Hadoop versions 
because dependencies are relocated, so it should not cash/conflict.





On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen <mailto:se...@apache.org>> wrote:


Hi!

The endpoint config entry looks correct.
I was looking at this issue to see if there are pointers to anything
else, but it looks like the explicit endpoint entry is the most
important thing: https://issues.apache.org/jira/browse/HADOOP-13324
<https://issues.apache.org/jira/browse/HADOOP-13324>

I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
pulling you in again - listening and learning still about the subtle
bits and pieces of S3).
@Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
only in Hadoop 2.8?

Best,
Stephan


    On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn mailto:domi...@dbruhn.de>> wrote:

Hey,
can anyone give a hint? Does anyone have flink running with an
S3 Bucket in Frankfurt/eu-central-1 and can share his config and
setup?

Thanks,
Dominik

On 22. Nov 2017, at 17:52, domi...@dbruhn.de
<mailto:domi...@dbruhn.de> wrote:


Hey everyone,
I'm trying since hours to get Flink 1.3.2 (downloaded for
hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
hosted in the eu-central-1 region. Everything works fine for
other regions. I'm running my job on a JobTracker in local
mode. I googled the internet and found several hints, most of
them telling that setting the `fs.s3a.endpoint` should solve
it. It doesn't. I'm also sure that the core-site.xml (see
below) is picked up, if I put garbage into the endpoint then I
receive a hostname not found error.

The exception I'm getting is:
com.amazonaws.services.s3.model.AmazonS3Exception: Status
Code: 400, AWS Service: Amazon S3, AWS Request ID:
432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
Request, S3 Extended Request ID:

1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=

I read the AWS FAQ but I don't think that

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request

<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request>
applies to me as I'm not running the NativeFileSystem.

I suspect this is related to the v4 signing protocol which is
required for S3 in Frankfurt. Could it be that the aws-sdk
version is just too old? I tried to play around with it but
the hadoop adapter is incompatible with newer versions.

I have the following core-site.xml:



 
fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
 fs.s3a.buffer.dir/tmp
 
fs.s3a.access.keysomething
 
fs.s3a.secret.keywont-tell
 
fs.s3a.endpoints3.eu-central-1.amazonaws.com
<http://s3.eu-central-1.amazonaws.com>






Re: S3 Access in eu-central-1

2017-11-27 Thread Dominik Bruhn
Hey,
can anyone give a hint? Does anyone have flink running with an S3 Bucket in 
Frankfurt/eu-central-1 and can share his config and setup?

Thanks,
Dominik

> On 22. Nov 2017, at 17:52, domi...@dbruhn.de wrote:
> 
> Hey everyone,
> I'm trying since hours to get Flink 1.3.2 (downloaded for hadoop 2.7) to 
> snapshot/checkpoint to an S3 bucket which is hosted in the eu-central-1 
> region. Everything works fine for other regions. I'm running my job on a 
> JobTracker in local mode. I googled the internet and found several hints, 
> most of them telling that setting the `fs.s3a.endpoint` should solve it. It 
> doesn't. I'm also sure that the core-site.xml (see below) is picked up, if I 
> put garbage into the endpoint then I receive a hostname not found error.
> 
> The exception I'm getting is:
> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS 
> Service: Amazon S3, AWS Request ID: 432415098B0994BC, AWS Error Code: null, 
> AWS Error Message: Bad Request, S3 Extended Request ID: 
> 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=
> 
> I read the AWS FAQ but I don't think that 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request
>  applies to me as I'm not running the NativeFileSystem.
> 
> I suspect this is related to the v4 signing protocol which is required for S3 
> in Frankfurt. Could it be that the aws-sdk version is just too old? I tried 
> to play around with it but the hadoop adapter is incompatible with newer 
> versions.
> 
> I have the following core-site.xml:
> 
> 
> 
>  
> fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
>  fs.s3a.buffer.dir/tmp
>  fs.s3a.access.keysomething
>  fs.s3a.secret.keywont-tell
>  
> fs.s3a.endpoints3.eu-central-1.amazonaws.com
>  
> Here is my lib folder with the versions of the aws-sdk and the hadoop-aws 
> integration:
> -rw---1 root root   11.4M Mar 20  2014 aws-java-sdk-1.7.4.jar
> -rw-r--r--1 1005 1006   70.0M Aug  3 12:10 
> flink-dist_2.11-1.3.2.jar
> -rw-rw-r--1 1005 1006   98.3K Aug  3 12:07 
> flink-python_2.11-1.3.2.jar
> -rw-r--r--1 1005 1006   34.9M Aug  3 11:58 
> flink-shaded-hadoop2-uber-1.3.2.jar
> -rw---1 root root  100.7K Jan 14  2016 hadoop-aws-2.7.2.jar
> -rw---1 root root  414.7K May 17  2012 httpclient-4.2.jar
> -rw---1 root root  218.0K May  1  2012 httpcore-4.2.jar
> -rw-rw-r--1 1005 1006  478.4K Jul 28 14:50 log4j-1.2.17.jar
> -rw-rw-r--1 1005 10068.7K Jul 28 14:50 slf4j-log4j12-1.7.7.jar
> 
> Can anyone give me any hints?
> 
> Thanks,
> Dominik


Set Parallelism and keyBy

2016-12-26 Thread Dominik Bruhn

Hey,
I have a flink job which has a default parallelism set to 2. I want to 
key the stream and then apply some flatMap on the keyed stream. The 
flatMap operation is quiet costly, so I want to have a much higher 
parallelism here (lets say 16). Additionally, it is important that the 
flatMap operation is executed for the same key always in the same 
process or in the same task.


I have the following code:


env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()


This works fine, and the "ExpensiveOperation" is executed always on the 
same tasks for the same keys.


Now I tried two things:

1.

env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).setParallelism(16).flatMap(new 
ExpensiveOperation()).print()


This fails with an exception because I can't set the parallelism on the 
keyBy operator.


2.
-
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new 
ExpensiveOperation()).setParallelism(16).print()

-
While this executes, it breaks the assignment of the keys to the tasks: 
The "ExpensiveOperation" is now not executed on the same nodes anymore 
all the time (visible by the prefixes in the print()).


What am I doing wrong? Is the only chance to set the whole parallelism 
of the whole flink job to 16?


Thanks, have nice holidays,
Dominik


Re: Who's hiring, December 2016

2016-12-16 Thread Dominik Bruhn
Relayr, a Berlin based IoT company is using Apache Flink for processing 
its sensor data. We are still in the learning phase, but are commited to 
using Flink.


Check out our public job ads for Berlin and Munich:
https://relayr.io/jobs/

If you are interested, even if there is no clearly matching job ad, feel 
free to ping me,

Dominik


On 16.12.2016 14:46, Kostas Tzoumas wrote:

Hi folks,

As promised, here is the first thread for Flink-related job positions. If
your organization is hiring people on Flink-related positions do reply to
this thread with a link for applications.

data Artisans is hiring on multiple technical positions. Help us build
Flink, and help our customers be successful in their Flink projects:

- Senior distributed systems engineer:
https://data-artisans.workable.com/jobs/396284

- Software engineer (Java/Scala and/or Python):
https://data-artisans.workable.com/jobs/396286

- QA/DevOps engineer: https://data-artisans.workable.com/jobs/396288

- UI/UX engineer: https://data-artisans.workable.com/jobs/396287

- Senior data engineer (EU and USA):
https://data-artisans.workable.com/jobs/325667

Best regards,
Kostas

PS: As mentioned in the original DISCUSS thread, I am cc'ing the dev and
user lists in the first few emails to remind folks to subscribe to the new
commun...@flink.apache.org mailing list

Instructions to subscribe are here:
http://flink.apache.org/community.html#mailing-lists



Release Process

2016-11-03 Thread Dominik Bruhn

Hey everyone,
about three month ago, I made a PR [1] to the flink github project 
containing a small change for the RabbitMQ source. This PR was merged 
and the code is in the master.


But: This code never made it into a release. In JIRA [2], it is meant to 
be released with 1.2. How is the policy here? When can I expect to see 
this in the a official release? How are the rules here?


Thanks for a clarification,
Dominik

[1]: https://github.com/apache/flink/pull/2373
[2]: https://issues.apache.org/jira/browse/FLINK-4394


BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

2016-11-01 Thread Dominik Bruhn

Hey,
I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my 
timestamps and discarding to old events (which happens sometimes).


Now my problem is that some events, by accident have timestamps in the 
future. If the timestamps are more in the future than my 
`maxOutOfOrderness`, I'm discarding valid events. So I need a way of 
saying that the
BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from 
the future for the watermark calculation. I still want to keep the 
events if they are in the future and assign them to the right 
watermarks.


How can I achieve this? I thought about checking whether the potential 
timestamp is in the future before considering it for a watermark. I 
cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea

https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7

Does this make sense? Or is there a better approach?

In general, how does Flink handle readings from the future?

Thanks,
Dominik

--
Dominik


TimeWindow Trigger which only fires when the values have changed

2016-10-04 Thread Dominik Bruhn

Hi,
I'm heavily relying on TimeWindows for my real time processing. Roughly 
my job consumes from an AMQP queue, computes some time buckets and saves 
the time-buckets to Cassandra.


I found the EventTimeTriggerWithEarlyAndLateFiring [1] class which 
already helped me a lot: Even with long time-windows, I can get 
intermediate values already saved to Cassandra by using the earlyFiring 
(and setting "accumulating" to true.


My question is: Would it be possible to only write fire the trigger if 
the value of the TimeBucket has changed? What I actually want is only 
writing to Cassandra if there is actually something different in the 
time bucket.


And, as a side question: Why is something like the 
EventTimeTriggerWithEarlyAndLateFiring not in the default Flink 
distribution? It seems very handy.


[1]: 
https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java



Thanks,
Dominik