RE: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread Newport, Billy
We execute multiple job graphs routinely because we cannot submit a single graph without it blowing up. I believe Regina spoke to this in Berlin during her talk. We instead if we are processing a database ingestion with 200 tables in it, we do a job graph per table rather than a single job graph

RE: the design of spilling to disk

2017-09-20 Thread Newport, Billy
Don’t forget there is also spilling/serialization in between stages in the pipeline if operations cannot be chained. From: Kurt Young [mailto:ykt...@gmail.com] Sent: Tuesday, September 19, 2017 9:09 PM To: Florin Dinu Cc: Kostas Kloudas; user@flink.apache.org; fhue...@apache.org Subject: Re: the

RE: Impersonation support in Flink

2017-10-24 Thread Newport, Billy
Our scenario is to enable a specific Kerberos to impersonate any Kerberos in a specific group, this is enabled the in hdfs configuration. That Kerberos does not need to be root, just a Kerberos allowed to impersonate that users in that group. We want the job to access HDFS as the impersonated K

RE: Job Manager Configuration

2017-11-02 Thread Newport, Billy
The user code for all the flows is common though so is there an inefficiency here in terms of copying this code for every operator? From: Chesnay Schepler [mailto:ches...@apache.org] Sent: Wednesday, November 01, 2017 7:09 AM To: user@flink.apache.org Subject: Re: Job Manager Configuration AFAI

RE: Two operators consuming from same stream

2018-01-26 Thread Newport, Billy
We've seen the same thing here. We read files twice for the same reason, it's simply faster to do that than to connect the two pipes to the same input. From: Sofer, Tovi [mailto:tovi.so...@citi.com] Sent: Monday, January 01, 2018 8:51 AM To: user@flink.apache.org Subject: Two operators consuming

RE: How to deal with dynamic types

2018-01-26 Thread Newport, Billy
We’ve been using genericRecords with custom serializers to do exactly this. We need to run the same flink pipeline for 10s of thousands of different schemas for our use cases and code gening or building that many different jars just isn’t practical. From: madan [mailto:madan.yella...@gmail.com

RE: Lost JobManager

2018-05-11 Thread Newport, Billy
It’s standard 1.3.2 on Java 7. We don’t use custom flink builds, just pull down whats in maven. From: Stephan Ewen [mailto:se...@apache.org] Sent: Friday, May 11, 2018 2:27 PM To: user@flink.apache.org Cc: Chan, Regina [Tech]; Newport, Billy [Tech]; Fabian Hueske Subject: Re: Lost JobManager

Avro Parquet/Flink/Beam

2016-12-12 Thread Newport, Billy
Are there any examples showing the use of beam with avro/parquet and a flink runner? I see an avro reader for beam, is it a matter of writing another one for avro-parquet or does this need to use the flink HadoopOutputFormat for example? Thanks Billy

RE: Avro Parquet/Flink/Beam

2016-12-12 Thread Newport, Billy
a custom DoFn to create the Parquet (waiting for the ParquetIO). Regards JB On 12/12/2016 05:19 PM, Newport, Billy wrote: > Are there any examples showing the use of beam with avro/parquet and a > flink runner? I see an avro reader for beam, is it a matter of writing > another one for a

RE: Avro Parquet/Flink/Beam

2016-12-13 Thread Newport, Billy
, I will push my branch with ParquetIO on my github. Yes, the Beam IO is independent from the runner. Regards JB On 12/12/2016 05:29 PM, Newport, Billy wrote: > I don't mind writing one, is there a fork for the ParquetIO works that's > already been done or is it in trunk? >

RE: Serializing NULLs

2017-01-04 Thread Newport, Billy
Map> in your avro schema is what you want here if the map values are nullable. From: Anirudh Mallem [mailto:anirudh.mal...@247-inc.com] Sent: Tuesday, December 20, 2016 2:26 PM To: user@flink.apache.org Subject: Re: Serializing NULLs If you are using Avro generated classes then you cannot have

RE: Avro Parquet/Flink/Beam

2017-01-11 Thread Newport, Billy
ed in 0.4.0-incubating (that I will prepare pretty soon). I will push the branch on my github (didn't have time yet, sorry about that). Regards JB On 12/13/2016 05:08 PM, Newport, Billy wrote: > Is your parquetio going to be accepted in to 0.4? > > Also, do you have a link to your g

Received an event in channel 0 while still having data from a record

2017-01-11 Thread Newport, Billy
Anyone seen this before: Caused by: java.io.IOException: Received an event in channel 0 while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo

Strange filter performance with parquet

2017-02-07 Thread Newport, Billy
We're reading a parquet file (550m records). We want to split the parquet using a filter in to 2 sets, live and dead. DataSet a = read parquet DataSet live = a.filter(liveFilter) DataSet dead = a.filter(deadFilter) Is slower than DataSet a = read parquet DataSet live = a.filter(liveFilter) Data

RE: Strange filter performance with parquet

2017-02-07 Thread Newport, Billy
spilling it to disk. Spilling to disk and reading the result back might be more expensive than reading the original input twice. You can also check the DataSet execution plan by calling getExecutionPlan() on the ExecutionEnvironment. Best, Fabian 2017-02-07 21:10 GMT+01:00 Newport, Billy

Cogroup hints/performance

2017-02-07 Thread Newport, Billy
We have a cogroup where sometimes we cogroup like this: Dataset z = larger.coGroup(small).where... The strategy is printed as hash on key and a sort asc on the other key. Which is which? Naively, we'd want to hash larger and sort the small? Or is that wrong? What factors would impact the perfo

RE: Strange filter performance with parquet

2017-02-07 Thread Newport, Billy
you use the HadoopIF wrapper, this might add some overhead. A dedicated Flink InputFormat for Parquet might help here. 2017-02-07 21:32 GMT+01:00 Newport, Billy mailto:billy.newp...@gs.com>>: It’s kind of like this: DataSet live = from previous DataSet newRecords = avro read Dataset mergedLi

RE: Strange filter performance with parquet

2017-02-07 Thread Newport, Billy
full Avro schema. If you can provide a specific record to the InputFormat, Flink will serialize it much more efficiently as a Pojo type. 2017-02-07 22:08 GMT+01:00 Newport, Billy mailto:billy.newp...@gs.com>>: We read them like this: Job job = Job.

RE: Strange filter performance with parquet

2017-02-07 Thread Newport, Billy
ill data to avoid a deadlock. Not sure what's causing the slowdown. How do you read Parquet files? If you use the HadoopIF wrapper, this might add some overhead. A dedicated Flink InputFormat for Parquet might help here. 2017-02-07 21:32 GMT+01:00 Newport, Billy mailto:billy.newp...@gs.com&g

FieldForwarding hints

2017-02-07 Thread Newport, Billy
For the following map, what would the hint be: @ForwardedField("f0->f1") // Correct? public class Tuplator extends FlatMapBase> { /** * */ private static final long serialVersionUID = 4443299154253252672L; public Tuplator(SerializableAvroRecordBuilder avroRecordBuilder)

Side outputs

2017-02-08 Thread Newport, Billy
I've implemented side outputs right now using an enum approach as recommended be others. Basically I have a mapper which wants to generate 4 outputs (DATA, INSERT, UPDATES, DELETE). It emits a Tuple2 right now and I use a 4 following filters to write each 'stream' to a different parquet file.

Serialization performance

2017-03-02 Thread Newport, Billy
We've been working on performance for the last while. We're using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files. Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custo

RE: Serialization performance

2017-03-02 Thread Newport, Billy
rmat seems like a good workaround on the DataSet side. Greetings, Stephan On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy mailto:billy.newp...@gs.com>> wrote: We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro an

Flink memory usage

2017-04-19 Thread Newport, Billy
How does Flink use memory? We're seeing cases when running a job on larger datasets where it throws OOM exceptions during the job. We're using the Dataset API. Shouldn't flink be streaming from disk to disk? We workaround by using fewer slots but it seems unintuitive that I need to change these

RE: Flink memory usage

2017-04-20 Thread Newport, Billy
We’re running this config now which is not really justifiable for what we’re doing. 20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap… Thanks From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Wednesday, April 19, 2017 10:52 AM To: Newport, Billy [Tech] Cc: user@flink.apache.org Subje

RE: Flink memory usage

2017-04-20 Thread Newport, Billy
y map execution, but rather clear, reuse the collected output of the flatMap, and so on. In the past we run long process of lot of data and small memory without problems. Many more complex co-group, joins and so on without any issue. My2c. Hope it helps. Stefano From: Newport, Billy [mailto:

RE: Flink memory usage

2017-04-20 Thread Newport, Billy
AM To: Stefano Bortoli Cc: Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org Subject: Re: Flink memory usage Hi Billy, if you didn't split the different data sets up into different slot sharing groups, then your maximum parallelism is 40. Thus, it should be enough to assign 40^2

RE: Collector.collect

2017-05-01 Thread Newport, Billy
We’ve done that but it’s very expensive from a serialization point of view when writing the same record multiple times, each in a different tuple. For example, we started with this: .collect(new Tuplemailto:gaurav671...@gmail.com] Sent: Saturday, April 29, 2017 4:32 AM To: user@flink.apache.org

RE: Collector.collect

2017-05-01 Thread Newport, Billy
x27;t incur any serialization overhead if the sink is chained to the map. The emitted Tuple could also share the GenericRecord; meaning you don't even have to copy it. On 01.05.2017 14:52, Newport, Billy wrote: We’ve done that but it’s very expensive from a serialization point of view when w

RE: Collector.collect

2017-05-02 Thread Newport, Billy
, May 01, 2017 12:56 PM To: Newport, Billy [Tech]; 'user@flink.apache.org' Subject: Re: Collector.collect Oh you have multiple different output formats, missed that. For the Batch API you are i believe correct, using a custom output-format is the best solution. In the Streaming API the

Test, plz ignore

2017-08-18 Thread Newport, Billy

Slack invitation

2017-08-18 Thread Newport, Billy
Email address bi...@billynewport.com Thanks

RE: Flink parquet read.write performance

2017-08-23 Thread Newport, Billy
[Tech] Cc: Newport, Billy [Tech]; user@flink.apache.org Subject: Re: Flink parquet read.write performance Hi, The Sink cannot be chained to the previous two operations because there are two operations. Chaining only works if there is one predecessor operation. Data transfer should still be

RE: Flink parquet read.write performance

2017-08-24 Thread Newport, Billy
12:21 PM To: Aljoscha Krettek Cc: Newport, Billy [Tech]; Chan, Regina [Tech]; user@flink.apache.org Subject: Re: Flink parquet read.write performance Hi! The sink is merely a union of the result of the co-group and the one data source. Can't you just make to distinct pipelines out of that?

RE: Flink parquet read.write performance

2017-08-24 Thread Newport, Billy
: Newport, Billy [Tech] Cc: Chan, Regina [Tech]; user@flink.apache.org Subject: Re: Flink parquet read.write performance Hi, The reason is that there are two (or more) different Threads doing the reading. As an illustration, consider first this case: DataSet input = ... input.map(new MapA()).map(new

API to launch flink session from Java?

2017-08-29 Thread Newport, Billy
Is there a way to start a yarn session from java? We want to use Kerberos impersonation and call doAs before running the session so it runs using the "correct" credential rather than a fixed one.

RE: DataSet: partitionByHash without materializing/spilling the entire partition?

2017-09-05 Thread Newport, Billy
We have the same issue. We are finding that we cannot express the data flow in a natural way because of unnecessary spilling. Instead, we're making our own operators which combine multiple steps together and essentially hide it from flink OR sometimes we even have to read an input dataset once p