Re: Parsing source JSON String as Scala Case Class

2016-08-05 Thread Jack Huang
Thanks Stephan. "lazy val" does the trick. On Thu, Aug 4, 2016 at 2:33 AM, Stephan Ewen wrote: > If the class has non-serializable members, you need to initialize them > "lazily" when the objects are already in the distributed execution (after > serializing / distributing them). > > Making a Sca

Re: Having a single copy of an object read in a RichMapFunction

2016-08-05 Thread Sameer Wadkar
You mean "Connected Streams"? I use that for the same requirement. I way it works it looks like it creates multiple copies per co-map operation. I use the keyed version to match side inputs with the data. Sent from my iPhone > On Aug 5, 2016, at 12:36 PM, Theodore Vasiloudis > wrote: > > Ye

Re: Having a single copy of an object read in a RichMapFunction

2016-08-05 Thread Theodore Vasiloudis
Yes this is a streaming use case, so broadcast is not an option. If I get it correctly with connected streams I would emulate side input by "streaming" the matrix with a key that all incoming vector records match on? Wouldn't that create multiple copies of the matrix in memory? On Thu, Aug 4, 20

RE: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Mao, Wei
Thank you Stefan and Gordon, It’s really helpful. I will try the “auto.offset.reset” property. And instead of use new consumer group every time, I would like to clean the offset under current consumer group before restarting Flink application, in order to avoid redundant records in ZK. Regards,

Re: Flink kafka group question

2016-08-05 Thread Mojes
Hi Gordon, I am consuming the messages from kafka with *FlinkKafkaConsumer09 *and I have also specified the group.id. I have *enabled checkpointing*, and below configs *auto.commit.enable=true auto.offset.reset=earliest.* >From your post I could understand that group.id is not much useful as far

Re: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Tzu-Li (Gordon) Tai
Hi, Please also note that the “auto.offset.reset” property is only respected when there is no offsets under the same consumer group in ZK. So, currently, in order to make sure you read from the latest / earliest offsets every time you restart your Flink application, you’d have to use an unique gro

Re: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Stefan Richter
Sorry, I think you are actually asking for the largest offset in the Kafka source, which makes it setProperty("auto.offset.reset", "largest"). > Am 05.08.2016 um 14:44 schrieb Stefan Richter : > > Hi, > > I think passing properties with setProperty("auto.offset.reset", "smallest“) > to the Kaf

Re: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Stefan Richter
Hi, I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka consumer should do what you want. Best, Stefan > Am 05.08.2016 um 14:36 schrieb Mao, Wei : > > I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I > noticed that when I res

How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Mao, Wei
I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that when I restarted my Flink application, it reads records starting from the latest offset that I consumed last time, but not the latest offsets of that topic in Kafka. So Is there any way to make it read fr

Re: Facing Issues while migrating to S3A

2016-08-05 Thread Stefan Richter
Hi, from the exception, it seems like some native library is missing in your classpath. The code of the native method should be contained in something like a HADOOP_HOME\bin\hadoop.dll. Best, Stefan > Am 05.08.2016 um 11:32 schrieb vinay patil : > > Hi All, > > I am facing issues when I chan

Facing Issues while migrating to S3A

2016-08-05 Thread vinay patil
Hi All, I am facing issues when I changed from S3n to S3A, I am using hadoop-aws-2.7.2 and aws-java-sdk-1.7.4 as dependencies Getting this error : java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access Tried to change the fs.s3.buffer.dir path but getting same error