Re:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-28 Thread Haibo Sun
Congratulations Dian !



Best,
Haibo

At 2020-08-27 18:03:38, "Zhijiang"  wrote:
>Congrats, Dian!
>
>
>--
>From:Yun Gao 
>Send Time:2020年8月27日(星期四) 17:44
>To:dev ; Dian Fu ; user 
>; user-zh 
>Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congratulations Dian !
>
> Best
> Yun
>
>
>--
>Sender:Marta Paes Moreira
>Date:2020/08/27 17:42:34
>Recipient:Yuan Mei
>Cc:Xingbo Huang; jincheng sun; 
>dev; Dian Fu; 
>user; user-zh
>Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congrats, Dian!
>On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
>Congrats!
>On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>Congratulations Dian!
>
>Best,
>Xingbo
>jincheng sun  于2020年8月27日周四 下午5:24写道:
>
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
>the Apache Flink Project Management Committee (PMC).
>
>Dian Fu has been very active on PyFlink component, working on various 
>important features, such as the Python UDF and Pandas integration, and keeps 
>checking and voting for our releases, and also has successfully produced two 
>releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
>release of Flink 1.12.
>
>Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
>Best,
>Jincheng(on behalf of the Flink PMC)
>


Re:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-28 Thread Haibo Sun
Congratulations Dian !



Best,
Haibo

At 2020-08-27 18:03:38, "Zhijiang"  wrote:
>Congrats, Dian!
>
>
>--
>From:Yun Gao 
>Send Time:2020年8月27日(星期四) 17:44
>To:dev ; Dian Fu ; user 
>; user-zh 
>Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congratulations Dian !
>
> Best
> Yun
>
>
>--
>Sender:Marta Paes Moreira
>Date:2020/08/27 17:42:34
>Recipient:Yuan Mei
>Cc:Xingbo Huang; jincheng sun; 
>dev; Dian Fu; 
>user; user-zh
>Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congrats, Dian!
>On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
>Congrats!
>On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>Congratulations Dian!
>
>Best,
>Xingbo
>jincheng sun  于2020年8月27日周四 下午5:24写道:
>
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
>the Apache Flink Project Management Committee (PMC).
>
>Dian Fu has been very active on PyFlink component, working on various 
>important features, such as the Python UDF and Pandas integration, and keeps 
>checking and voting for our releases, and also has successfully produced two 
>releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
>release of Flink 1.12.
>
>Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
>Best,
>Jincheng(on behalf of the Flink PMC)
>


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


Re:[ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Haibo Sun
Thanks Gary & Yu. Great work!


Best,
Haibo


At 2020-02-12 21:31:00, "Yu Li"  wrote:
>The Apache Flink community is very happy to announce the release of Apache
>Flink 1.10.0, which is the latest major release.
>
>Apache Flink® is an open-source stream processing framework for
>distributed, high-performing, always-available, and accurate data streaming
>applications.
>
>The release is available for download at:
>https://flink.apache.org/downloads.html
>
>Please check out the release blog post for an overview of the improvements
>for this new major release:
>https://flink.apache.org/news/2020/02/11/release-1.10.0.html
>
>The full release notes are available in Jira:
>https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
>
>We would like to thank all contributors of the Apache Flink community who
>made this release possible!
>
>Cheers,
>Gary & Yu


Re:[ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Haibo Sun
Great news! Thanks Gordon and Kurt!Best,
Haibo

At 2019-08-22 20:03:26, "Tzu-Li (Gordon) Tai"  wrote:
>The Apache Flink community is very happy to announce the release of Apache
>Flink 1.9.0, which is the latest major release.
>
>Apache Flink® is an open-source stream processing framework for
>distributed, high-performing, always-available, and accurate data streaming
>applications.
>
>The release is available for download at:
>https://flink.apache.org/downloads.html
>
>Please check out the release blog post for an overview of the improvements
>for this new major release:
>https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>
>The full release notes are available in Jira:
>https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601
>
>We would like to thank all contributors of the Apache Flink community who
>made this release possible!
>
>Cheers,
>Gordon


Re:Re: FlatMap returning Row<> based on ArrayList elements()

2019-08-07 Thread Haibo Sun
Hi AU,


> The problem with this approach is that I'm looking for a standard FlatMap 
> anonymous function that could return every time: 1. different number of 
> elements within the Array and 2. the data type can be random likewise. I mean 
> is not fixed the whole time then my TypeInformation return would fix every 
> execution.

As far as I know, there is no such standard flatMap function. The table 
definition requires a fixed number of columns, and even if Flink can infer 
column types, it also requires that the column types are fixed. For the case 
you said, the number of columns in the table should be the possible maximum 
number of elements. If the number of elements is not enough, you should pad all 
columns defined by the table and then return.  For case where elements in the 
same column may have different types, you can convert them to a uniform column 
type defined by the table, or customize a type that can handle these different 
types of elements.



Best,
Haibo

At 2019-08-07 23:05:51, "Andres Angel"  wrote:

Hello Victor ,


You are totally right , so now this turn into is Flink capable to handle these 
cases where would be required define the type info in the row and the Table 
will infer the columns separated by comma or something similar?


thanks
AU


On Wed, Aug 7, 2019 at 10:33 AM Victor Wong  wrote:


Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you 
register the table when the number of elements/columns and data types are both 
nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel 
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun 
Cc: user 
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end 
because your example is right and very accurate in the way about how to turn an 
array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,1196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm 
planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify 
the RowTypeInfo. Here is the big deal because yes I know I would be able to use 
something like :

 

 

TypeInformation[] types= {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction, Row>() {

@Override

public void flatMap(List value, Collector out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap 
anonymous function that could return every time: 1. different number of 
elements within the Array and 2. the data type can be random likewise. I mean 
is not fixed the whole time then my TypeInformation return would fix every 
execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun  wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?

 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction, Row>() {

@Override

public void flatMap(List value, Collector out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel"  wrote:



Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:

 

 

Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):

 

Tuple mytuple = Tuple.newInstance(5);
for (int i = 0; i < pelements.size(); i++) {
mytuple.setField

Re:Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Haibo Sun
Congratulations!


Best,
Haibo
At 2019-08-08 02:08:21, "Yun Tang"  wrote:
>Congratulations Hequn.
>
>Best
>Yun Tang
>
>From: Rong Rong 
>Sent: Thursday, August 8, 2019 0:41
>Cc: dev ; user 
>Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>
>Congratulations Hequn, well deserved!
>
>--
>Rong
>
>On Wed, Aug 7, 2019 at 8:30 AM mailto:xingc...@gmail.com>> 
>wrote:
>
>Congratulations, Hequn!
>
>
>
>From: Xintong Song mailto:tonysong...@gmail.com>>
>Sent: Wednesday, August 07, 2019 10:41 AM
>To: d...@flink.apache.org
>Cc: user mailto:user@flink.apache.org>>
>Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>
>
>
>Congratulations~!
>
>
>Thank you~
>
>Xintong Song
>
>
>
>
>
>On Wed, Aug 7, 2019 at 4:00 PM vino yang 
>mailto:yanghua1...@gmail.com>> wrote:
>
>Congratulations!
>
>highfei2...@126.com 
>mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:
>
>> Congrats Hequn!
>>
>> Best,
>> Jeff Yang
>>
>>
>>  Original Message 
>> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> From: Piotr Nowojski
>> To: JingsongLee
>> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
>> ,user
>>
>>
>> Congratulations :)
>>
>> On 7 Aug 2019, at 12:09, JingsongLee 
>> mailto:lzljs3620...@aliyun.com>> wrote:
>>
>> Congrats Hequn!
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:Biao Liu mailto:mmyy1...@gmail.com>>
>> Send Time:2019年8月7日(星期三) 12:05
>> To:Zhu Zhu mailto:reed...@gmail.com>>
>> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff Zhang 
>> mailto:zjf...@gmail.com>>; Paul
>> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
>> mailto:sunjincheng...@gmail.com>>; dev
>> mailto:d...@flink.apache.org>>; user 
>> mailto:user@flink.apache.org>>
>> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
>>
>> Congrats Hequn!
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
>> mailto:reed...@gmail.com>> wrote:
>> Congratulations to Hequn!
>>
>> Thanks,
>> Zhu Zhu
>>
>> Zili Chen mailto:wander4...@gmail.com>> 于2019年8月7日周三 
>> 下午5:16写道:
>> Congrats Hequn!
>>
>> Best,
>> tison.
>>
>>
>> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 下午5:14写道:
>> Congrats Hequn!
>>
>> Paul Lam mailto:paullin3...@gmail.com>> 于2019年8月7日周三 
>> 下午5:08写道:
>> Congrats Hequn! Well deserved!
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年8月7日,16:28,jincheng sun 
>> mailto:sunjincheng...@gmail.com>> 写道:
>>
>> Hi everyone,
>>
>> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Hequn has been contributing to Flink for many years, mainly working on
>> SQL/Table API features. He's also frequently helping out on the user
>> mailing lists and helping check/vote the release.
>>
>> Congratulations Hequn!
>>
>> Best, Jincheng
>> (on behalf of the Flink PMC)
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>>


Re:FlatMap returning Row<> based on ArrayList elements()

2019-08-07 Thread Haibo Sun
Hi Andres Angel,


I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction, Row>() {
@Override
public void flatMap(List value, Collector out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();


env.execute("test job");


Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel"  wrote:

Hello everyone,



I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:






Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):


Tuple mytuple = Tuple.newInstance(5);
for (int i = 0; i < pelements.size(); i++) {
mytuple.setField(pelements.get(i), i);
}
out.collect(Row.of(mytuple));





However , it doesnt work because this is being parsed s 1 element for  sqlQuery 
step. how could I do something like:


pelements.forEach(n->out.collect(Row.of(n)));



Thanks so much

Re:Pramaters in eclipse with Flink

2019-08-06 Thread Haibo Sun
Hi alaa.abutaha,


In fact, your problem is not related to Flink, but how to specify program 
parameters in Eclipse. I think the following document will help you.


https://www.cs.colostate.edu/helpdocs/cmd.pdf


Best,
Haibo


At 2019-07-26 22:02:48, "alaa"  wrote:
>Hallo 
> I run this example form GitHub 
>https://github.com/ScaleUnlimited/flink-streaming-kmeans
>
> but I am not familiar with eclipse and i got this error 
>
> 
>
>I dont know how and where i should put the following parameters:
>
>-local (to specify running Flink locally, versus on a real cluster)
>-input  (e.g.
>/path/to/flink-streaming-kmeans/src/test/resources/citibike-20180801-min.tsv)
>-accesstoken 
>-clusters  (5 or 10 are good values)
>-queryable (to enable calls to the API, on port 8085).
>
>Thank you
>
>
>
>--
>Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: How to write value only using flink's SequenceFileWriter?

2019-08-06 Thread Haibo Sun
Hi Liu Bo,


If you haven't customize serializations through the configuration item 
"io.serializations", the default serializer for Writable objects is 
org.apache.hadoop.io.serializer.WritableSerialization.WritableSerializer. As 
you said, when WritableSerializer serialize the NullWritable object, it doesn't 
actually write anything. So I suspect that "(null)" you saw may be part of the 
value, not the key.




Best,
Haibo

At 2019-07-27 11:43:47, "Liu Bo"  wrote:

The file header says key is NullWritable: 

SEQ^F!org.apache.hadoop.io.NullWritable^Yorg.apache.hadoop.io.Text^A^A)org.apache.hadoop.io.compress.SnappyCodec


Might be a hadoop -text problem?


On Sat, 27 Jul 2019 at 11:07, Liu Bo  wrote:

Dear flink users, 


We're trying to switch from StringWriter to SequenceFileWriter to turn on 
compression. StringWriter writes value only and we want to keep that way.


AFAIK, you can use NullWritable in Hadoop writers to escape key so you only 
write the values. 


So I tried with NullWritable as following code:


   BucketingSink> hdfsSink = new 
BucketingSink("/data/cjv");
  hdfsSink.setBucketer(new DateTimeBucketer<>("-MM-dd/HH", ZoneOffset.UTC));
  hdfsSink.setWriter(new SequenceFileWriter("org.apache.hadoop.io.compress.SnappyCodec", 
SequenceFile.CompressionType.BLOCK));
  hdfsSink.setBatchSize(1024 * 1024 * 250);
  hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);



   joinedResults.map(new MapFunction, 
Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
return Tuple2.of(NullWritable.get(), new Text(value.f1));
}
}).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");


But out put file has key as string value (null)
eg:
(null)  {"ts":1564168038,"os":"android",...}


So my question is how to escape the key completely and write value only in 
SequenceFileWriter?

Your help will be much of my appreciation.


--

All the best

Liu Bo




--

All the best

Liu Bo

Re:StreamingFileSink part file count reset

2019-07-29 Thread Haibo Sun
Hi Sidhartha,


Currently, the part counter is never reset to 0, nor is it allowed to customize 
the part filename. So I don't think there's any way to reset it right now.  I 
guess the reason why it can't be reset to 0 is that it is concerned that the 
previous parts will be overwritten. Although the bucket id is part of the part 
file path, StreamingFileSink does not know when the bucket id will change in 
the case of custom BucketAssginer.


Best,
Haibo

At 2019-07-30 06:13:54, "sidhartha saurav"  wrote:

Hi,

We are using StreamingFileSink with a custom BucketAssigner and 
DefaultRollingPolicy. The custom BucketAssigner is simply a date bucket 
assigner. The StreamingFileSink creates part files with name 
"part--". The 
count is an integer and is incrementing on each rollover. Now my doubts are:

1. When does this count reset to 0 ?
2. Is there a way i can reset this count programmatically ? Since we are using 
day bucket we would like the count to reset every day.

We are using Flink 1.8

Thanks
Sidhartha


Re:sqlQuery split string

2019-07-24 Thread Haibo Sun
Hi Andres Angel,


At present, there seems to be no such built-in function, and you need to 
register a user-defined function to do that. You can look at the following 
document to see how to do.


https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions


Best,
Haibo


At 2019-07-25 06:00:53, "Andres Angel"  wrote:

Hello everyone, 


Following the current available functions 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html,
 how could I split a column string by a caracter?


example 


column content : col =a,b,c
query: Select col from tenv
expected return : cola , colb, colc 




thanks 



Re:Re: Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun


The following JIRA is about the problem you encounter. I think you should be 
very interested in its comments.There does seem to be a problem with shading 
Akka, and Flink is considering isolating the classloader that contain Akka and 
Scala to allow the applications and Flink to use different Akka versions.


https://issues.apache.org/jira/browse/FLINK-10903


Best,
Haibo

At 2019-07-25 00:07:27, "Debasish Ghosh"  wrote:

Also wanted to check if anyone has ventured into this exercise of shading Akka 
in Flink .. 
Is this something that qualifies as one of the roadmap items in Flink ?


regards.


On Wed, Jul 24, 2019 at 3:44 PM Debasish Ghosh  wrote:

Hi Haibo - Thanks for the clarification ..


regards.


On Wed, Jul 24, 2019 at 2:58 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,


I agree that Flink should shade its Akka. 


Maybe you misunderstood me. I mean, in the absence of official shading Akka in 
Flink, the relatively conservative way is to shade Akka of your application (I 
concern Flink won't work well after shading its Akka).


Best,
Haibo

At 2019-07-24 16:43:28, "Debasish Ghosh"  wrote:

For our application users are expected to work with Akka APIs - hence if I 
shade Akka in my application users will need to work with shaded imports which 
feels unnatural. With Flink, Akka is an implementation detail and Flink users 
are not expected to use Akka APIs. Hence shading will not have any user level 
impact. 


Hence the suggestion to shade Akka in Flink rather than the user application.


regards.


On Wed, 24 Jul 2019 at 2:04 PM, Jeff Zhang  wrote:

I think it is better to shade all the dependencies of flink so that all the 
projects that use flink won't hit this kind of issue.




Haibo Sun  于2019年7月24日周三 下午4:07写道:

Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Best Regards

Jeff Zhang
--

Sent from my iPhone




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi  Debasish Ghosh,


I agree that Flink should shade its Akka. 


Maybe you misunderstood me. I mean, in the absence of official shading Akka in 
Flink, the relatively conservative way is to shade Akka of your application (I 
concern Flink won't work well after shading its Akka).


Best,
Haibo

At 2019-07-24 16:43:28, "Debasish Ghosh"  wrote:

For our application users are expected to work with Akka APIs - hence if I 
shade Akka in my application users will need to work with shaded imports which 
feels unnatural. With Flink, Akka is an implementation detail and Flink users 
are not expected to use Akka APIs. Hence shading will not have any user level 
impact. 


Hence the suggestion to shade Akka in Flink rather than the user application.


regards.


On Wed, 24 Jul 2019 at 2:04 PM, Jeff Zhang  wrote:

I think it is better to shade all the dependencies of flink so that all the 
projects that use flink won't hit this kind of issue.




Haibo Sun  于2019年7月24日周三 下午4:07写道:

Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Best Regards

Jeff Zhang
--

Sent from my iPhone

Re:Re: How to handle JDBC connections in a topology

2019-07-24 Thread Haibo Sun
Hi Stephen,


I don't think it's possible to use the same connection pool for the entire 
topology, because the nodes on the topology may run in different JVMs and on 
different machines.


If you want all operators running in the same JVM to use the same connection 
pool, I think you can implement a static class that contains the connection 
pool, and then the operators get the  connection from it.


Best,
Haibo

At 2019-07-24 15:20:31, "Stephen Connolly"  
wrote:

Oh and I'd also need some way to clean up the per-node transient state if the 
topology stops running on a specific node.


On Wed, 24 Jul 2019 at 08:18, Stephen Connolly 
 wrote:

Hi,


So we have a number of nodes in our topology that need to do things like 
checking a database, e.g.


* We need a filter step to drop events on the floor from systems we are no 
longer interested in
* We need a step that outputs on a side-channel if the event is for an object 
where the parent is not currently known to the database.


Right now we are grabbing a JDBC connection for each node in the topology that 
needs to talk to the database and storing the connection in a transient field 
(to exclude it from the serialized state)


What I'd really like to do is have a JDBC connection pool shared across the 
entire topology as that way we could have the pool check for stale connections, 
etc.


Does anyone have any tips for doing this kind of thing?


(My current idea is to maintain a `static final 
WeakHashMap` in the main class... but that feels 
very much like a hack)


What I'm really looking for is some form of Node Transient State... are there 
any examples of this type of think.


Flink 1.8.x


Thanks,


-Stephen

Re:Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Re: MiniClusterResource class not found using AbstractTestBase

2019-07-23 Thread Haibo Sun
Hi,  Juan  


It is dependent on "flink-runtime-*-tests.jar", so build.sbt should be modified 
as follows: 


scalaVersion := "2.11.0"


val flinkVersion = "1.8.1"


libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
  "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests"
)


Best,
Haibo

At 2019-07-23 17:51:23, "Fabian Hueske"  wrote:

Hi Juan,


Which Flink version do you use?


Best, Fabian



Am Di., 23. Juli 2019 um 06:49 Uhr schrieb Juan Rodríguez Hortalá 
:

Hi,



I'm trying to use AbstractTestBase in a test in order to use the mini cluster. 
I'm using specs2 with Scala, so I cannot extend AbstractTestBase because I also 
have to extend org.specs2.Specification, so I'm trying to access the mini 
cluster directly using Specs2 BeforeAll to initialize it as follows



private val miniClusterResource = AbstractTestBase.miniClusterResource
miniClusterResource.before()


The problem is that the code doesn't even compile, because it fails to locate 
`org.apache.flink.runtime.testutils.MiniClusterResource`


```

[warn] Class org.apache.flink.runtime.testutils.MiniClusterResource not found - 
continuing with a stub.
[warn] Class 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not found - 
continuing with a stub.
[error] Class org.apache.flink.runtime.testutils.MiniClusterResource not found 
- continuing with a stub.
[warn] two warnings found
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 22, 2019 9:38:49 PM
```



I'm importing the following libraries in build.sbt


"org.apache.flink" %% "flink-test-utils"  % flinkVersion,
"org.apache.flink" %% "flink-runtime"  % flinkVersion


Am I missing some additional library?


Thanks,



Juan


Re:AW: Re:Unable to build Flink1.10 from source

2019-07-22 Thread Haibo Sun


Please check whether the following profile section exists in 
"flink-filesystems/flink-mapr-fs/pom.xml". If not, you should pull the latest 
code and try to compile it again. If yes, please share the latest error 
message, it may be different from before.




unsafe-mapr-repo


unsafe-mapr-repo





mapr-releases

http://repository.mapr.com/maven/

false

true






Best,
Haibo

At 2019-07-23 04:54:11, "Yebgenya Lazarkhosrouabadi" 
 wrote:


Hi,

I used the command  mvn clean package -DskipTests -Punsafe-mapr-repo  , but it 
didn’t work. I get the same error.

 

Regards

Yebgenya Lazar

 

Von: Haibo Sun 
Gesendet: Montag, 22. Juli 2019 04:40
An: Yebgenya Lazarkhosrouabadi 
Cc: user@flink.apache.org
Betreff: Re:Unable to build Flink1.10 from source

 

Hi,  Yebgenya  

 

The reason for this problem can be found in this email 
(http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-SSL-issue-when-building-flink-mapr-fs-td30757.html).
 The solution is to add the parameter "-Punsafe-mapr-repo" to the maven 
command, as given in the e-mail.

 

Best,

Haibo


At 2019-07-22 02:52:57, "Yebgenya Lazarkhosrouabadi" 
 wrote:



Hello,

 

I’m trying to build flink1.10 from source , but it fails with this error;

 

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve 
dependencies for project org.apache.flink:flink-mapr-fs:jar:1.10-SNAPSHOT: 
Failed to collect dependencies at com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Failed 
to read artifact descriptor for com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Could 
not transfer artifact com.mapr.hadoop:maprfs:pom:5.2.1-mapr from/to 
mapr-releases (https://repository.mapr.com/maven/): 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

 

 

Can you please help me to solve this problem ?

 

Regards

Yebgenya Lazar

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Re:Unable to build Flink1.10 from source

2019-07-21 Thread Haibo Sun
Hi,  Yebgenya  


The reason for this problem can be found in this email 
(http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-SSL-issue-when-building-flink-mapr-fs-td30757.html).
 The solution is to add the parameter "-Punsafe-mapr-repo" to the maven 
command, as given in the e-mail.


Best,
Haibo

At 2019-07-22 02:52:57, "Yebgenya Lazarkhosrouabadi" 
 wrote:


Hello,

 

I’m trying to build flink1.10 from source , but it fails with this error;

 

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve 
dependencies for project org.apache.flink:flink-mapr-fs:jar:1.10-SNAPSHOT: 
Failed to collect dependencies at com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Failed 
to read artifact descriptor for com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Could 
not transfer artifact com.mapr.hadoop:maprfs:pom:5.2.1-mapr from/to 
mapr-releases (https://repository.mapr.com/maven/): 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

 

 

Can you please help me to solve this problem ?

 

Regards

Yebgenya Lazar

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Re:Re: Writing Flink logs into specific file

2019-07-19 Thread Haibo Sun
Hi, Soheil


Placing the log configuration file in the resource directory of the job's jar 
will not be used by Flink, because the log configuration is explicitly 
specified by the script under the bin directory of Flink and the bootstrap code 
(for example the BootstrapTools class). If you want to output the logs of Flink 
components (such as Client, JM and TM) to the non-default files, you should 
modify the log configuration file of Flink with reference to the document 
mentioned by Caizhi and Biao. Note that the underlying logging framework of 
Flink defaults to log4j, so by default you should modify "log4j*. properties" , 
and "logback*. xml" is the configuration files for logback.


But I guess you might want to specify the log file for the job instead of the 
Flink component. If so, one way is to create a custom root logger to achieve 
that, as shown in the following example code. The following code is for log4j, 
If you use logback and are interested in that, you can study it yourself. 


public static final class PrintLog implements MapFunction {
private static final Logger LOG = CustomLogger.getLogger(Tokenizer.class);


@Override
public void map(String value) {
LOG.info("Custom Logger: " + value);
}
}


public static final class CustomLogger {
private static final Logger rootLogger = new Hierarchy(new 
RootLogger(Level.INFO)).getRootLogger();


static {
FileAppender customAppender = null;
try {
customAppender = new FileAppender(
new PatternLayout("%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"),
new File(getLogPath(), "myjob.log").getAbsolutePath(),
false);


customAppender.setName("custom");
rootLogger.addAppender(customAppender);
} catch (IOException e) {
throw new RuntimeException(e);
}
}


public static Logger getLogger(Class clazz) {
return rootLogger.getLoggerRepository().getLogger(clazz.getName());
}


private static String getLogPath() {
String path = null;


Enumeration enumeration = Logger.getRootLogger().getAllAppenders();
while (enumeration.hasMoreElements()) {
Appender appender = (Appender) enumeration.nextElement();
if (appender instanceof FileAppender) {
path = new Path(((FileAppender) appender).getFile()).getParent().getPath();
break;
}
}
if (path == null || path.isEmpty()) {
path = new File("").getAbsolutePath();
}


return path;
}
}




Best,
Haibo

At 2019-07-19 11:21:45, "Biao Liu"  wrote:

Hi Soheil,


> I was wondering if is it possible to save logs into a specified file?


Yes, of course.


> I put the following file in the resource directory of the project but it has 
> no effect


I guess because the log4j has a higher priority. In the document [1], it says 
"Users willing to use logback instead of log4j can just exclude log4j (or 
delete it from the lib/ folder)."


1. 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html




Soheil Pourbafrani  于2019年7月19日周五 上午2:03写道:

Hi,



When we run the Flink application some logs will be generated about the 
running, in both local and distributed environment. I was wondering if is it 
possible to save logs into a specified file?


I put the following file in the resource directory of the project but it has no 
effect:
logback.xml


flink_logs.txtfalse%d{HH:mm:ss.SSS}
 [%thread] %-5level %logger{60} %X{sourceThread} - 
%msg%n

Re:Re: yarn-session vs cluster per job for streaming jobs

2019-07-18 Thread Haibo Sun
HI, Maxim


As far as I understand, it's hard to draw a simple conclusion that who's 
faster. If the job is smaller (for example, the vertex number and the 
parallelism are very small), the session is usually faster than the per-job 
mode. I think the session has the advantage of sharing AM and TM, which saves 
some time for  applying and starting containers. But because of the sharing, 
there will be some resource competition, such as network bandwidth in the 
submit-job phase. If it is very sensitive to speed, perhaps you can do a 
comparative test for your specific jobs and environment, and then decide which 
mode to use?


Best,
Haibo

At 2019-07-18 14:03:01, "Maxim Parkachov"  wrote:

Hi Haibo,


thanks for tip, I almost forgot about max-attempts. I understood implication of 
running with one AM.


Maybe my question was incorrect, but what would be faster (with regards to 
downtime of each job):


1. In case of yarn-session: Parallel cancel all jobs with savepoints, restart 
yarn-session, parallel start all jobs from savepoints
2. In case of per-job mode Parallel cancel all jobs with savepoints, parallel 
start all jobs from savepoints.


I want to optimise standard situation where I deploy new version of all my 
jobs. My current impression that job starts faster in yarn-session mode.


Thanks,
Maxim.




On Thu, Jul 18, 2019 at 4:57 AM Haibo Sun  wrote:

Hi, Maxim


For the concern talking on the first point: 
If HA and checkpointing are enabled, AM (the application master, that is the 
job manager you said) will be restarted by YARN after it dies, and then the 
dispatcher will try to restore all the previously running jobs correctly. Note 
that the number of attempts be decided by the configurations 
"yarn.resourcemanager.am.max-attempts" and "yarn.application-attempts". The 
obvious difference between the session and per-job modes is that if a fatal 
error occurs on AM, it will affect all jobs running in it, while the per-job 
mode will only affect one job.



You can look at this document to see how to configure HA for the Flink cluster 
on YARN: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
 .


Best,
Haibo


At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:

Hi,


I'm looking for advice on how to run flink streaming jobs on Yarn cluster in 
production environment. I tried in testing environment both approaches with HA 
mode, namely yarn session + multiple jobs vs cluster per job, both seems to 
work for my cases, with slight preference of yarn session mode to centrally 
manage credentials. I'm looking to run about 10 streaming jobs mostly 
reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have 
a concern here what happens when Job manager dies in session mode.

2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible 
continious processing.


Thanks in advance,
Maxim.


Re:Re: Job leak in attached mode (batch scenario)

2019-07-17 Thread Haibo Sun


There should be no JIRA about the requirement. If you have a strong need for 
this feature, you can create one. In addition, you can also go to 
issues.apache.org and search with keywords to confirm whether there are the 
relevant JIRA.


Best,
Haibo

At 2019-07-18 10:31:22, "qi luo"  wrote:
Thanks Haibo for the response!


Is there any community issue or plan to implement heartbeat mechanism between 
Dispatcher and Client? If not, should I create one?


Regards,
Qi



On Jul 17, 2019, at 10:19 AM, Haibo Sun  wrote:


Hi, Qi


As far as I know, there is no such mechanism now. To achieve this, I think it 
may be necessary to add a REST-based heartbeat mechanism between Dispatcher and 
Client. At present, perhaps you can add a monitoring service to deal with these 
residual Flink clusters.


Best,
Haibo

At 2019-07-16 14:42:37, "qi luo"  wrote:
Hi guys,


We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.


However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.


Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!


Thanks,
Qi



Re:yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Haibo Sun
Hi, Maxim


For the concern talking on the first point: 
If HA and checkpointing are enabled, AM (the application master, that is the 
job manager you said) will be restarted by YARN after it dies, and then the 
dispatcher will try to restore all the previously running jobs correctly. Note 
that the number of attempts be decided by the configurations 
"yarn.resourcemanager.am.max-attempts" and "yarn.application-attempts". The 
obvious difference between the session and per-job modes is that if a fatal 
error occurs on AM, it will affect all jobs running in it, while the per-job 
mode will only affect one job.



You can look at this document to see how to configure HA for the Flink cluster 
on YARN: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
 .


Best,
Haibo


At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:

Hi,


I'm looking for advice on how to run flink streaming jobs on Yarn cluster in 
production environment. I tried in testing environment both approaches with HA 
mode, namely yarn session + multiple jobs vs cluster per job, both seems to 
work for my cases, with slight preference of yarn session mode to centrally 
manage credentials. I'm looking to run about 10 streaming jobs mostly 
reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have 
a concern here what happens when Job manager dies in session mode.

2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible 
continious processing.


Thanks in advance,
Maxim.


Re:Job leak in attached mode (batch scenario)

2019-07-16 Thread Haibo Sun
Hi, Qi


As far as I know, there is no such mechanism now. To achieve this, I think it 
may be necessary to add a REST-based heartbeat mechanism between Dispatcher and 
Client. At present, perhaps you can add a monitoring service to deal with these 
residual Flink clusters.


Best,
Haibo

At 2019-07-16 14:42:37, "qi luo"  wrote:
Hi guys,


We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.


However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.


Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!


Thanks,
Qi

Re:Converting Metrics from a Reporter to a Custom Events mapping

2019-07-16 Thread Haibo Sun
Hi,  Vijay


Or can you implement a Reporter that transforms the metrics and sends them 
directly to a Kinesis Stream?


Best,
Haibo

At 2019-07-16 00:01:36, "Vijay Balakrishnan"  wrote:

Hi,

I need to capture the Metrics sent from a Flink app to a Reporter and transform 
them to an Events API format I have designed. I have been looking at the 
Reporters(https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables)
 and have used them but what would be a best practice to capture this metrics 
data to transform it ?


The folks using the Flink app still want to see their metrics in the Flink 
Dashboard using their chosen(not sure yet what they chose-assuming 
ConsoleReporter) Reporter. I need to capture those metrics, transform them to 
my Events API format and send it to a Kinesis Stream.


We use Prometheus and InfluxDB in our environments for other purposes.


Should I use the SLF4J Reporter to dump the metrics into a log file/folder and 
watch that with a Kinesis Agent and transform it somehow(?) and then send it to 
the Kinesis data stream ?


TIA,


Re:Re: Creating a Source function to read data continuously

2019-07-15 Thread Haibo Sun
Hi, Soheil


As Caizhi said, to create a source that implements `SourceFunction`, you can 
first take a closer look at the example in javadoc 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html).
 Although `InputFormat` is not recommended to implement a streaming source, it 
can achieve continuous data reading. As for finishing the job after reading all 
the data, I think it's your implementation problem. In addition, creating a 
custom source can also implements or extends `RichSourceFunction`, 
`ParallelSourceFunction`, `RichParallelSourceFunction`, etc.


I don't know how you will achieve continuous reading. Maybe you can also look 
at the implementation of `ContinuousFileMonitoringFunction`: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous
 File Monitoring Function.java


I hope this will help you.


Best,
Haibo

At 2019-07-16 10:12:21, "Caizhi Weng"  wrote:

Hi Soheil,


It's not recommended to implement a streaming source using `InputFormat` (it's 
mainly used for batch source). To implement a streaming source, 
`SourceFunction` is recommended.


It's clearly written (with examples) in the java docs in `SourceFucntion` how 
to write a `run` and `cancel` method. You can refer to that to write your own 
MySQL streaming source.


Soheil Pourbafrani  于2019年7月16日周二 上午7:29写道:

Hi,


Extending the "RichInputFormat" class I could create my own MySQL input. I want 
to use it for reading data continuously from a table but I observed that the 
"RichInputFormat" class read all data and finish the job.


I guess for reading data continuously I need to extend the "SourceFunction" but 
I observed that it has only two methods: the run() and the cancel()


So I was wondering is it possible to implement a new class to read data from 
MySQL tables continuously? Like what we can do with Kafka connector


Thanks

Re:State incompatible

2019-07-15 Thread Haibo Sun
Hi,  Avi Levi


I don't think there's any way to solve this problem right now, and Flink 
documentation clearly shows that this is not supported. 


“Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa will lead to compatibility failure and 
StateMigrationException."


Flink Document: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl


Best,
Haibo

At 2019-07-14 16:50:19, "Avi Levi"  wrote:

Hi,

I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", 
Types.CASE_CLASS[DomainState])


vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
des.enableTimeToLive(storeTtl)
des
  }


BUT when trying to restore from savepoint I am getting this error:


java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more


Do you have any idea how can I resolve it ? 


Best wishes 

Re:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Haibo Sun
Congrats Rong!Best,
Haibo

At 2019-07-12 09:40:26, "JingsongLee"  wrote:

Congratulations Rong. 
Rong Rong has done a lot of nice work in the past time to the flink community.


Best, JingsongLee


--
From:Rong Rong 
Send Time:2019年7月12日(星期五) 08:09
To:Hao Sun 
Cc:Xuefu Z ; dev ; Flink ML 

Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer


Thank you all for the warm welcome!


It's my honor to become an Apache Flink committer. 
I will continue to work on this great project and contribute more to the 
community.



Cheers,
Rong


On Thu, Jul 11, 2019 at 1:05 PM Hao Sun  wrote:

Congratulations Rong. 


On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:

Congratulations, Rong!



On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:

Congrats, Rong!


On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:

> Congratulations Rong!
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Jul 11, 2019 at 1:44 PM Peter Huang 
> wrote:
>
>> Congrats Rong!
>>
>> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin  wrote:
>>
>>> Congrats, Rong!
>>>
>>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui  wrote:
>>>
 Congrats Rong!

 Best,
 Xingcan

 On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:

 Congratulations, Rong!

 On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:

> Congratulations Rong!
>
> Best Regards,
> Yu
>
>
> On Thu, 11 Jul 2019 at 22:54, zhijiang 
> wrote:
>
>> Congratulations Rong!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Kurt Young 
>> Send Time:2019年7月11日(星期四) 22:54
>> To:Kostas Kloudas 
>> Cc:Jark Wu ; Fabian Hueske ;
>> dev ; user 
>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>>
>> Congratulations Rong!
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas 
>> wrote:
>> Congratulations Rong!
>>
>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>> Congratulations Rong Rong!
>> Welcome on board!
>>
>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>> wrote:
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the
>> Flink PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on
>> SQL and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>>
>>




--

Xuefu Zhang

"In Honey We Trust!"


Re:Cannot catch exception throws by kafka consumer with JSONKeyValueDeserializationSchema

2019-07-08 Thread Haibo Sun
Hi,   Zhechao 


Usually, if you can, share your full exception stack and where you are trying 
to capture exceptions in your code (preferably with posting your relevant code 
directly
). That will help us understand and locate the issue you encounter.


Best,
Haibo

At 2019-07-08 14:11:22, "Zhechao Ma"  wrote:

Hello,


I'm using flinkKafkaConsumer to read message from a kafka topic with 
JSONKeyValueDeserializationSchema. When the message is json formatted, 
everything works fine, but it throws NullPointerException when processing a 
message is not json formatted. I try to catch the exception but cannot do that.


Can anyone give out some tips?


flink: 1.5
flink-kafka: 1.5
kafka-clients: 0.10.1.2_2.11

flink-json:


--

Thanks
Zhechao Ma


Re:Tracking message processing in my application

2019-07-04 Thread Haibo Sun
Hi,  Roey


> What do you think about that? 


I would have some concerns about throughput and latency, so I think that the 
operators should report state data asynchronously and in batches to minimize 
the impact of monitoring on the normal business processing. In addition, If the 
amount of business data is too large in a certain period of time, which leads 
to the operator-side state data backlog exceeding the set capacity, how to deal 
with the operator-side state data also needs to be considered, whether to 
discard or block the business data processing, or other ways?


Best,
Haibo 

At 2019-07-04 20:29:02, "Halfon, Roey"  wrote:


Hi,

We are looking for a monitoring solution for our dataflow – Track the progress 
of incoming messages while they are processed.
I'll clarify – we want to build some service which will show status for each 
incoming message. And in case of failures to give some detailed information.

I thought about the following:
First, every incoming message will be assigned with some id.
We can create a "Reporter" (A logger with some additional capabilities)  which 
each operator can communicate with, and update a status and more relevant 
information. These details can be stroed in kibana (ES) for example.
Then, we need to create another service which will query kibana and shows the 
results.

What do you think about that? Is there any built-in solution for that? (flink 
built in metrics are not relevant here because they don't help to track a 
single message)

How are you logging and tracking your processed messages?
Is there any documentation or some use cases that I can learn from?

Thanks,
Roey.

Re:Load database table with many columns

2019-07-03 Thread Haibo Sun
Hi,  Soheil Pourbafrani


For the current implementation of JDBCInputFormat, it cannot automatically 
infer the column types. As far as I know, there also is no other way to do this.


If you're going to implement such an input format, the inference work needs to 
be done by yourself. Because it relies on the interfaces provided by JDBC to 
get the table schema or the resultset metadata, rather than the Flink 
interfaces.


Best,
Haibo

At 2019-07-04 03:20:30, "Soheil Pourbafrani"  wrote:

Hi,


I use the following sample code to load data from a database into Flink DataSet:


DataSetdbData=env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername("org.apache.derby.jdbc.EmbeddedDriver").setDBUrl("jdbc:derby:memory:persons").setQuery("select
 name, age from 
persons").setRowTypeInfo(newRowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO)).finish());


But the problem is sometimes there will be tables with many columns so we need 
a more straightforward way to introduce the type of the columns to Flink.
So my question is, is there any way to Flink infer the column type so no need 
to announce each column type one by one?
Does Flink provide some way to implement such inputformat?

Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

2019-07-03 Thread Haibo Sun
Hi, Andreas  


I'm glad you have had a solution. If you're interested in option 2 I talked 
about, you can follow up on the progress of the issue 
(https://issues.apache.org/jira/browse/FLINK-12573) that Yitzchak said by 
watching it.


Best,
Haibo

At 2019-07-03 21:11:44, "Hailu, Andreas"  wrote:


Hi Haibo, Yitzchak, thanks for getting back to me.

 

The pattern I chose to use which worked was to extend the HadoopOutputFormat 
class, override the open() method, and modify the “mapreduce.output.basename” 
configuration property to match my desired file naming structure.

 

// ah

 

From: Haibo Sun 
Sent: Tuesday, July 2, 2019 5:57 AM
To: Yitzchak Lieberman 
Cc: Hailu, Andreas [Tech] ; user@flink.apache.org
Subject: Re:Re: File Naming Pattern from HadoopOutputFormat

 


Hi, Andreas 

 

You are right. To meet this requirement, Flink should need to expose a 
interface to allow customizing the filename.

 

Best,

Haibo


At 2019-07-02 16:33:44, "Yitzchak Lieberman"  wrote:



regarding option 2 for parquet:

implementing bucket assigner won't set the file name as getBucketId() defined 
the directory for the files in case of partitioning the data, for example:

/day=20190101/part-1-1

there is an open issue for that: 
https://issues.apache.org/jira/browse/FLINK-12573

 

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun  wrote:

Hi, Andreas

 

I think the following things may be what you want.

 

1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.

The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html

 

  public static class CustomAvroOutputFormat extends AvroOutputFormat {
  public CustomAvroOutputFormat(Path filePath, 
Class type) {
   super(filePath, type);
  }
 
  public CustomAvroOutputFormat(Class type) {
   super(type);
  }
 
  @Override
  public void open(int taskNumber, int numTasks) 
throws IOException {
   
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
   super.open(taskNumber, 
numTasks);
  }
 
  @Override
  protected String getDirectoryFileName(int 
taskNumber) {
   // returns a custom filename
   return null;
  }
  }

 

2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).

 

ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

 

StreamingFileSink#forBulkFormat: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java

 

DateTimeBucketAssigner: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java

 

 

Best,

Haibo


At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:



Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a 
UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-1.snappy.parquet

tmp-r-2.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

 

Best,

Andreas

 


Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your inform

Re:Could not load the native RocksDB library

2019-07-02 Thread Haibo Sun
Hi,  Samya.Patro


I guess this may be a setup problem. What OS and what version of JDK do you 
use?  You can try upgrading JDK to see if the issue can be solved.


Best,
Haibo

At 2019-07-02 17:16:59, "Patro, Samya"  wrote:


Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   ”Could not load the native RocksDB library” .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

 

Flink version  - 1.8.0

 


org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


 

This is  the flink checkpointing config I have used

 

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);

executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

 

When I run the pipeline, I get this error

 

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)

... 5 more

Caused by: java.io.IOException: Could not load the native RocksDB library

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 7 more

Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print

at java.lang.ClassLoader$NativeLibrary.load(Native Method)

at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)

at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)

at java.lang.Runtime.load0(Runtime.java:809)

at java.lang.System.load(System.java:1086)

at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)

at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:888)

... 11 more

 

 

Thanks and  Regards,
Samya Ranjan Patro
Goldman sachs

 




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re:Re: File Naming Pattern from HadoopOutputFormat

2019-07-02 Thread Haibo Sun

Hi, Andreas 


You are right. To meet this requirement, Flink should need to expose a 
interface to allow customizing the filename.
 

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman"  wrote:

regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined 
the directory for the files in case of partitioning the data, for example:
/day=20190101/part-1-1
there is an open issue for that: 
https://issues.apache.org/jira/browse/FLINK-12573


On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun  wrote:

Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


public static class CustomAvroOutputFormat extends AvroOutputFormat {
public CustomAvroOutputFormat(Path filePath, Class type) {
super(filePath, type);
}

public CustomAvroOutputFormat(Class type) {
super(type);
}

@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
super.open(taskNumber, numTasks);
}

@Override
protected String getDirectoryFileName(int taskNumber) {
// returns a custom filename
return null;
}
}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).


ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a 
UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-1.snappy.parquet

tmp-r-2.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re:File Naming Pattern from HadoopOutputFormat

2019-07-01 Thread Haibo Sun
Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


public static class CustomAvroOutputFormat extends AvroOutputFormat {
public CustomAvroOutputFormat(Path filePath, Class type) {
super(filePath, type);
}

public CustomAvroOutputFormat(Class type) {
super(type);
}

@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
super.open(taskNumber, numTasks);
}

@Override
protected String getDirectoryFileName(int taskNumber) {
// returns a custom filename
return null;
}
}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).


ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a 
UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-1.snappy.parquet

tmp-r-2.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re:Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-07-01 Thread Haibo Sun
Hi, Vadim 
 
I tried many times with the master branch code and failed to reproduce this 
issue. Which version of Flink did you use?


For the Configuration class in your code, I use `org. apache. hadoop. conf. 
Configuration`.


The configurations I enabled in flink-conf.yaml are as follows (except that, no 
other changes have been made):
high-availability: zookeeper
high-availability.storageDir: file:///data/tmp/flink/ha/
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink-test
blob.storage.directory: /data/tmp/flink/blob


Best,
Haibo


At 2019-06-28 15:49:50, "Vadim Vararu"  wrote:

Hi,


I've run it on a standalone Flink cluster. No Yarn involved.
From: Haibo Sun 
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: user@flink.apache.org
Subject: Re:Flink batch job memory/disk leak when invoking set method on a 
static Configuration object.
 
Hi, Vadim


This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.


If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.


Best,
Haibo



At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:

Hi guys,


I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}


The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.








Also, the problem reproduces only if I actually invoke the set method of 
Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}


From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
to a non-static field, then the problem does no reproduce any more.




However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.


Thanks, 
Vadim.



Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-27 Thread Haibo Sun
Hi, Vadim


This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.


If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.


Best,
Haibo



At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:

Hi guys,


I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}


The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.








Also, the problem reproduces only if I actually invoke the set method of 
Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}


From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
to a non-static field, then the problem does no reproduce any more.




However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.


Thanks, 
Vadim.



Re:Limit number of jobs or Job Managers

2019-06-27 Thread Haibo Sun
Hi,  Pankaj Chand 


If you're running Flink on YARN, you can do this by limiting the number of 
applications in the cluster or in the queue. As far as I know, Flink does not 
limit that.


The following are the configuration items for  YARN :
yarn.scheduler.capacity.maximum-applications
yarn.scheduler.capacity..maximum-applications


Best,
Haibo

At 2019-06-27 20:55:48, "Pankaj Chand"  wrote:

Hi everyone,


Is there any way (parameter or function) I can limit the number of concurrent 
jobs executing in my Flink cluster? Or alternatively, limit the number of 
concurrent Job Managers (since there has to be one Job Manager for every job)?


Thanks!


Pankaj