Re: System properties when submitting flink job to YARN Session

2017-07-10 Thread Jins George
Thanks Nico. I am able to pass arguments to the main program, that works, but not exactly that I was looking for. I guess to have all worker jvms the same system property, I have to set it at yarn-session creation time using -D ( haven't tried it yet) Thanks, Jins George On 07/10/2017

Re: Nested Field Expressions with Rows

2017-07-10 Thread Joshua Griffith
Indeed that worked. Thanks! > On Jul 10, 2017, at 11:57 AM, Fabian Hueske wrote: > > Hi, > > You have to add the implicit value in the main() method before you call > .map(rowFn) and not in the MapFunction. > > Best, Fabian > > > 2017-07-10 18:54 GMT+02:00 Joshua

Re: Nested Field Expressions with Rows

2017-07-10 Thread Fabian Hueske
Hi, You have to add the implicit value in the main() method before you call .map(rowFn) and not in the MapFunction. Best, Fabian 2017-07-10 18:54 GMT+02:00 Joshua Griffith : > Hello Fabian, > > Thank you for your response. I tried your recommendation but I’m getting

Re: Nested Field Expressions with Rows

2017-07-10 Thread Joshua Griffith
I apologize, that was the wrong link. Here’s where the exception is thrown: https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L329-L331

Re: Nested Field Expressions with Rows

2017-07-10 Thread Joshua Griffith
Hello Fabian, Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried: class MakeRow extends MapFunction[(Integer, Integer), Row] { implicit val rowType: TypeInformation[Row] = new RowTypeInfo(

Re: Nested Field Expressions with Rows

2017-07-10 Thread Fabian Hueske
Hi Joshua, thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API. It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType. I think this should be fix. I'll open a JIRA issue for

Re: Flink Jobs disappers

2017-07-10 Thread Joshua Griffith
Are your containers on separate nodes? Are you running in Kubernetes? Have you set hard resource limits? When I’ve run into this issue it’s been because the JobManager was restarted (I wasn’t running in HA mode). Your node could have been restarted or Docker could have OOM-killed the process

Re: Nested Field Expressions with Rows

2017-07-10 Thread Joshua Griffith
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields: package com.github.hadronzoo.rowerror import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import

Re: data loss after implementing checkpoint

2017-07-10 Thread Nico Kruber
Hi Aftab, looks like what you want is either an externalized checkpoint with RETAIN_ON_CANCELLATION mode [1] or a savepoint [2]. Ordinary checkpoints are deleted when the job is cancelled and only serve as a fault tolerance layer in case something goes wrong, i.e. machines fail, so that the

Re: Nested Field Expressions with Rows

2017-07-10 Thread Nico Kruber
Can you show a minimal example of the query you are trying to run? Maybe Timo or Fabian (cc'd) can help. Nico On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote: > Hello, > > When using nested field expressions like “Account.Id" with nested rows, I > get the following error, “This type

Re: data loss after implementing checkpoint

2017-07-10 Thread Kien Truong
Hi, I think you need to create a savepoint and restore from there. https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html Checkpoint are for automatic recovery within the lifetime of a job, they're deleted when you stop the job manually. Regards, Kien On

Re: System properties when submitting flink job to YARN Session

2017-07-10 Thread Nico Kruber
Hi Jins, I'm not sure whether you can define a system property, but you can include it in the program arguments of "flink run [OPTIONS] " You may also be able to define system properties but these are probably only valid in your main() function executed within the flink run script, not any

Re: problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-10 Thread Nico Kruber
Hi Günter, unfortunately, I cannot reproduce your error. This is what I did (following http://training.data-artisans.com/devEnvSetup.html): * clone and build the flink-training-exercises project: git clone https://github.com/dataArtisans/flink-training-exercises.git cd flink-training-exercises

data loss after implementing checkpoint

2017-07-10 Thread Aftab Ansari
Hi, I am new to flink. I am facing issue implementing checkpoint. checkpoint related code: long checkpointInterval = 5000; StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); //specify backend //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));

Re: what happened to flink on tez?

2017-07-10 Thread Fabian Hueske
Hi Yingda, Flink on Tez was dropped in preparation of the 1.0 release as discussed on the mailing list [1]. In addition to the reasons in the thread, Tez did not support pipelined shuffles which are a prerequisite for low-latency stream processing, the most common use case for Flink. Cheers,