How to run a job with job cluster mode on top of mesos?

2019-03-21 Thread Jacky Yin 殷传旺
Anybody can help? I found the help documentation about how to do with the job 
cluster mode for yarn, docker and k8s. However only the help of session cluster 
mode is provided in flink web site for mesos.
It looks like the shell “mesos-appmaster-job.sh” should be the right one to run 
a job with the mode of job cluster on top of mesos.  However I cannot find any 
help or example for this shell.
Any help will be greatly appreciated.

Thanks!

Jacky Yin
发件人: Jacky Yin 殷传旺 
日期: 2019年3月21日 星期四 下午2:31
收件人: "user@flink.apache.org" 
主题: Documentation of mesos-appmaster-job.sh

Hello All,

I cannot find any documentation or help about how to use 
$flin_home/bin/mesos-appmaster-job.sh.  Anybody help?

Thanks!

Jacky Yin


Re: [DISCUSS] Introduction of a Table API Java Expression DSL

2019-03-21 Thread Jark Wu
Hi Timo,

Sounds good to me.

Do you want to deprecate the string-based API in 1.9 or make the decision
in 1.10 after some feedbacks ?


On Thu, 21 Mar 2019 at 21:32, Timo Walther  wrote:

> Thanks for your feedback Rong and Jark.
>
> @Jark: Yes, you are right that the string-based API is used quite a lot.
> On the other side, the potential user base in the future is still bigger
> than our current user base. Because the Table API will become equally
> important as the DataStream API, we really need to fix some crucial design
> decisions before it is too late. I would suggest to introduce the new DSL
> in 1.9 and remove the Expression parser either in 1.10 or 1.11. From a
> developement point of view, I think we can handle the overhead to maintain
> 3 APIs until then because 2 APIs will share the same code base + expression
> parser.
>
> Regards,
> Timo
>
> Am 21.03.19 um 05:21 schrieb Jark Wu:
>
> Hi Timo,
>
> I'm +1 on the proposal. I like the idea to provide a Java DSL which is
> more friendly than string-based approach in programming.
>
> My concern is if/when we can drop the string-based expression parser. If
> it takes a very long time, we have to paid more development
> cost on the three Table APIs. As far as I know, the string-based API is
> used in many companies.
> We should also get some feedbacks from users. So I'm CCing this email to
> user mailing list.
>
> Best,
> Jark
>
>
>
> On Wed, 20 Mar 2019 at 08:51, Rong Rong  wrote:
>
>> Thanks for sharing the initiative of improving Java side Table expression
>> DSL.
>>
>> I agree as in the doc stated that Java DSL was always a "3rd class
>> citizen"
>> and we've run into many hand holding scenarios with our Flink developers
>> trying to get the Stringify syntax working.
>> Overall I am a +1 on this, it also help reduce the development cost of the
>> Table API so that we no longer need to maintain different DSL and
>> documentations.
>>
>> I left a few comments in the doc. and also some features that I think will
>> be beneficial to the final outcome. Please kindly take a look @Timo.
>>
>> Many thanks,
>> Rong
>>
>> On Mon, Mar 18, 2019 at 7:15 AM Timo Walther  wrote:
>>
>> > Hi everyone,
>> >
>> > some of you might have already noticed the JIRA issue that I opened
>> > recently [1] about introducing a proper Java expression DSL for the
>> > Table API. Instead of using string-based expressions, we should aim for
>> > a unified, maintainable, programmatic Java DSL.
>> >
>> > Some background: The Blink merging efforts and the big refactorings as
>> > part of FLIP-32 have revealed many shortcomings in the current Table &
>> > SQL API design. Most of these legacy issues cause problems nowadays in
>> > making the Table API a first-class API next to the DataStream API. An
>> > example is the ExpressionParser class[2]. It was implemented in the
>> > early days of the Table API using Scala parser combinators. During the
>> > last years, this parser caused many JIRA issues and user confusion on
>> > the mailing list. Because the exceptions and syntax might not be
>> > straight forward.
>> >
>> > For FLINK-11908, we added a temporary bridge instead of reimplementing
>> > the parser in Java for FLIP-32. However, this is only a intermediate
>> > solution until we made a final decision.
>> >
>> > I would like to propose a new, parser-free version of the Java Table
>> API:
>> >
>> >
>> >
>> https://docs.google.com/document/d/1r3bfR9R6q5Km0wXKcnhfig2XQ4aMiLG5h2MTx960Fg8/edit?usp=sharing
>> >
>> > I already implemented an early protoype that shows that such a DSL is
>> > not much implementation effort and integrates nicely with all existing
>> > API methods.
>> >
>> > What do you think?
>> >
>> > Thanks for your feedback,
>> >
>> > Timo
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-11890
>> >
>> > [2]
>> >
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
>> >
>> >
>>
>
>


Re: Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread Rong Rong
Based on what I saw in the implementation, I think you meant to implement a
ScalarFunction right? since you are only trying to structure a VarArg
string into a Map.

If my understanding was correct. I think the Map constructor[1] is
something you might be able to leverage. It doesn't resolve your
Nullability issue though.
Otherwise you can use the Scalar UDF [2]

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html#value-construction-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions



On Thu, Mar 21, 2019 at 5:02 PM shkob1  wrote:

> Looking further into the RowType it seems like this field is translated as
> a
> CURSOR rather than a map.. not sure why
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread shkob1
Looking further into the RowType it seems like this field is translated as a
CURSOR rather than a map.. not sure why



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread shkob1
Hey,

As im building a SQL query, im trying to conditionally build a map such that
there won't be any keys with null values in it. AFAIK from Calcite there's
no native way to do it (other than using case to build the map in different
ways, but then i have a lot of key/value pairs so thats not reasonable). I
tried to implement it using a flink UDF but failed - i wonder if a table
function can return a map since im getting "The Nothing type cannot have a
serializer".

Example SQL:

Select a, b, c, removeNulls('key1', d, 'key2', e, 'key3', f) AS my_map
FROM... 

My Table Function code is:

public class RemoveNullValuesFunction extends
TableFunction> {

public static final String NAME = "removeNulls";

public void eval(String... keyValues) {

if (keyValues.length == 0) {
collect(Collections.emptyMap());
return;
}
final List keyValuesList = Arrays.asList(keyValues);
Map output = Maps.newHashMap();
for (int i = 0; i < keyValuesList.size(); i = i + 2){
final String key = keyValuesList.get(i);
final String value = keyValuesList.get(i + 1);
if (value != null)
output.put(key, value);
}
collect(output);

}

@Override
public TypeInformation> getResultType() {
return Types.MAP(Types.STRING(),Types.STRING());
}
}


I wonder if thats not possible or am i missing something in how the
serialization works?

Thanks!
Shahar







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

2019-03-21 Thread Rafi Aroch
Hi Kostas,

Yes I have.

Rafi

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas  wrote:

> Hi Rafi,
>
> Have you enabled checkpointing for you job?
>
> Cheers,
> Kostas
>
> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch  wrote:
>
>> Hi Piotr and Kostas,
>>
>> Thanks for your reply.
>>
>> The issue is that I don't see any committed files, only in-progress.
>> I tried to debug the code for more details. I see that in
>> *BulkPartWriter* I do reach the *write* methods and see events getting
>> written, but I never reach the *closeForCommit*. I reach straight to the
>> *close* function where all parts are disposed.
>>
>> In my job I have a finite stream (source is reading from parquet file/s).
>> Doing some windowed aggregation and writing back to a parquet file.
>> As far as I know, it should commit files during checkpoints and when the
>> stream has finished. I did enabled checkpointing.
>> I did verify that if I connect to other sinks, I see the events.
>>
>> Let me know if I can provide any further information that could be
>> helpful.
>>
>> Would appreciate your help.
>>
>> Thanks,
>> Rafi
>>
>>
>> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas 
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> Piotr is correct. In-progress files are not necessarily readable.
>>> The valid files are the ones that are "committed" or finalized.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 I’m not sure, but shouldn’t you be just reading committed files and
 ignore in-progress? Maybe Kostas could add more insight to this topic.

 Piotr Nowojski

 On 20 Mar 2019, at 12:23, Rafi Aroch  wrote:

 Hi,

 I'm trying to stream events in Prorobuf format into a parquet file.
 I looked into both streaming-file options: BucketingSink &
 StreamingFileSink.
 I first tried using the newer *StreamingFileSink* with the *forBulkFormat
 *API. I noticed there's currently support only for the Avro format
 with the *ParquetAvroWriters*.
 I followed the same convention as Avro and wrote a
 *ParquetProtoWriters* builder class:

 public class ParquetProtoWriters {

 private static final int pageSize = 64 * 1024;

 public static  ParquetWriterFactory 
 forType(final Class protoClass) {
 final ParquetBuilder builder = (out) -> 
 createProtoParquetWriter(protoClass, out);
 return new ParquetWriterFactory<>(builder);
 }

 private static  ParquetWriter 
 createProtoParquetWriter(
 Class type,
 OutputFile out) throws IOException {

 return ProtoParquetWriter.builder(out)
 .withPageSize(pageSize)
 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
 .withCompressionCodec(CompressionCodecName.SNAPPY)
 .withProtoClass(type)
 .build();
 }
 }

 And then I use it as follows:

 StreamingFileSink
 .forBulkFormat(new Path("some-path), 
 ParquetProtoWriters.forType(SomeProtoType.class))
 .build();

 I ran tests on the *ParquetProtoWriters *itself and it writes
 everything properly and i'm able to read the files.

 When I use the sink as part of a job *I see illegal Parquet files
 created*:

 # parquet-tools cat 
 .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
 .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
 file (too small length: 4)


 Can anyone suggest what am I missing here?

 When trying to use the *BucketingSink*, I wrote a Writer class for
 Protobuf and everything worked perfectly:

 public class FlinkProtoParquetWriter 
 implements Writer {

 private static final long serialVersionUID = -975302556515811398L;

 private Path path;
 private Class protoClass;
 private transient ParquetWriter writer;

 private int position;
 private final CompressionCodecName compressionCodecName = 
 CompressionCodecName.SNAPPY;
 private final int pageSize = 64 * 1024;

 public FlinkProtoParquetWriter(Class protoClass) {
 this.protoClass = protoClass;
 }

 @Override
 public void open(FileSystem fs, Path path) throws IOException {
 this.position = 0;
 this.path = path;

 if (writer != null) {
 writer.close();
 }

 writer = createWriter();
 }

 @Override
 public long flush() throws IOException {
 Preconditions.checkNotNull(writer);
 position += writer.getDataSize();
 writer.close();
 writer = createWriter();

 return position;
 }

 @Override
 public l

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

2019-03-21 Thread Kostas Kloudas
Hi Rafi,

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch  wrote:

> Hi Piotr and Kostas,
>
> Thanks for your reply.
>
> The issue is that I don't see any committed files, only in-progress.
> I tried to debug the code for more details. I see that in *BulkPartWriter* I
> do reach the *write* methods and see events getting written, but I never
> reach the *closeForCommit*. I reach straight to the *close* function
> where all parts are disposed.
>
> In my job I have a finite stream (source is reading from parquet file/s).
> Doing some windowed aggregation and writing back to a parquet file.
> As far as I know, it should commit files during checkpoints and when the
> stream has finished. I did enabled checkpointing.
> I did verify that if I connect to other sinks, I see the events.
>
> Let me know if I can provide any further information that could be helpful.
>
> Would appreciate your help.
>
> Thanks,
> Rafi
>
>
> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas  wrote:
>
>> Hi Rafi,
>>
>> Piotr is correct. In-progress files are not necessarily readable.
>> The valid files are the ones that are "committed" or finalized.
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> I’m not sure, but shouldn’t you be just reading committed files and
>>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>>
>>> Piotr Nowojski
>>>
>>> On 20 Mar 2019, at 12:23, Rafi Aroch  wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to stream events in Prorobuf format into a parquet file.
>>> I looked into both streaming-file options: BucketingSink &
>>> StreamingFileSink.
>>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>>> *API. I noticed there's currently support only for the Avro format with
>>> the *ParquetAvroWriters*.
>>> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
>>> builder
>>> class:
>>>
>>> public class ParquetProtoWriters {
>>>
>>> private static final int pageSize = 64 * 1024;
>>>
>>> public static  ParquetWriterFactory forType(final 
>>> Class protoClass) {
>>> final ParquetBuilder builder = (out) -> 
>>> createProtoParquetWriter(protoClass, out);
>>> return new ParquetWriterFactory<>(builder);
>>> }
>>>
>>> private static  ParquetWriter 
>>> createProtoParquetWriter(
>>> Class type,
>>> OutputFile out) throws IOException {
>>>
>>> return ProtoParquetWriter.builder(out)
>>> .withPageSize(pageSize)
>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>> .withCompressionCodec(CompressionCodecName.SNAPPY)
>>> .withProtoClass(type)
>>> .build();
>>> }
>>> }
>>>
>>> And then I use it as follows:
>>>
>>> StreamingFileSink
>>> .forBulkFormat(new Path("some-path), 
>>> ParquetProtoWriters.forType(SomeProtoType.class))
>>> .build();
>>>
>>> I ran tests on the *ParquetProtoWriters *itself and it writes
>>> everything properly and i'm able to read the files.
>>>
>>> When I use the sink as part of a job *I see illegal Parquet files
>>> created*:
>>>
>>> # parquet-tools cat 
>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>>> file (too small length: 4)
>>>
>>>
>>> Can anyone suggest what am I missing here?
>>>
>>> When trying to use the *BucketingSink*, I wrote a Writer class for
>>> Protobuf and everything worked perfectly:
>>>
>>> public class FlinkProtoParquetWriter implements 
>>> Writer {
>>>
>>> private static final long serialVersionUID = -975302556515811398L;
>>>
>>> private Path path;
>>> private Class protoClass;
>>> private transient ParquetWriter writer;
>>>
>>> private int position;
>>> private final CompressionCodecName compressionCodecName = 
>>> CompressionCodecName.SNAPPY;
>>> private final int pageSize = 64 * 1024;
>>>
>>> public FlinkProtoParquetWriter(Class protoClass) {
>>> this.protoClass = protoClass;
>>> }
>>>
>>> @Override
>>> public void open(FileSystem fs, Path path) throws IOException {
>>> this.position = 0;
>>> this.path = path;
>>>
>>> if (writer != null) {
>>> writer.close();
>>> }
>>>
>>> writer = createWriter();
>>> }
>>>
>>> @Override
>>> public long flush() throws IOException {
>>> Preconditions.checkNotNull(writer);
>>> position += writer.getDataSize();
>>> writer.close();
>>> writer = createWriter();
>>>
>>> return position;
>>> }
>>>
>>> @Override
>>> public long getPos() {
>>> Preconditions.checkNotNull(writer);
>>> return position + writer.getDataSize();
>>> }
>>>
>>> @Override
>>> public void close() throws IOException {
>>> if (writer != null) {
>>> 

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-21 Thread Oytun Tez
Thank you, all! If there are operational tasks about the ecosystem page(s),
let me know (organizing the content etc, whatever).

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Mar 21, 2019 at 2:14 PM Becket Qin  wrote:

> Thanks for the update Robert! Looking forward to the prototype!
>
> On Thu, Mar 21, 2019 at 10:07 PM Robert Metzger 
> wrote:
>
>> Quick summary of our call:
>> Daryl will soon start with a front end, build against a very simple
>> mock-backend.
>> Congxian will start implementing the Spring-based backend early April.
>>
>> As soon as the first prototype of the UI is ready, we'll share it here for
>> feedback.
>>
>> On Thu, Mar 21, 2019 at 10:08 AM Robert Metzger 
>> wrote:
>>
>> > Okay, great.
>> >
>> > Congxian Qiu, Daryl and I have a kick-off call later today at 2pm CET,
>> 9pm
>> > China time about the design of the ecosystem page (see:
>> > https://github.com/rmetzger/flink-community-tools/issues/4)
>> > Please let me know if others want to join as well, I can add them to the
>> > invite.
>> >
>> > On Wed, Mar 20, 2019 at 4:10 AM Becket Qin 
>> wrote:
>> >
>> >> I agree. We can start with english-only and see how it goes. The
>> comments
>> >> and descriptions can always be multi-lingual but that is up to the
>> package
>> >> owners.
>> >>
>> >> On Tue, Mar 19, 2019 at 6:07 PM Robert Metzger 
>> >> wrote:
>> >>
>> >>> Thanks.
>> >>>
>> >>> Do we actually want this page to be multi-language?
>> >>>
>> >>> I propose to make the website english-only, but maybe consider
>> allowing
>> >>> comments in different languages.
>> >>> If we would make it multi-language, then we might have problems with
>> >>> people submitting packages in non-english languages.
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Mar 19, 2019 at 2:42 AM Becket Qin 
>> wrote:
>> >>>
>>  Done. The writeup looks great!
>> 
>>  On Mon, Mar 18, 2019 at 9:09 PM Robert Metzger 
>>  wrote:
>> 
>> > Nice, really good news on the INFRA front!
>> > I think the hardware specs sound reasonable. And a periodic backup
>> of
>> > the website's database to Infra's backup solution sounds reasonable
>> too.
>> >
>> > Can you accept and review my proposal for the website?
>> >
>> >
>> > On Sat, Mar 16, 2019 at 3:47 PM Becket Qin 
>> > wrote:
>> >
>> >> >
>> >> > I have a very capable and motivated frontend developer who would
>> be
>> >> > willing to implement what I've mocked in my proposal.
>> >>
>> >>
>> >> That is awesome!
>> >>
>> >> I created a Jira ticket[1] to Apache Infra and got the reply. It
>> >> looks that
>> >> Apache infra team could provide a decent VM. The last piece is how
>> to
>> >> ensure the data is persisted so we won't lose the project info /
>> user
>> >> feedbacks when the VM is down. If Apache infra does not provide a
>> >> persistent storage for DB backup, we can always ask for multiple
>> VMs
>> >> and do
>> >> the fault tolerance by ourselves. It seems we can almost say the
>> >> hardware
>> >> side is also ready.
>> >>
>> >> Thanks,
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> [1] https://issues.apache.org/jira/browse/INFRA-18010
>> >>
>> >> On Fri, Mar 15, 2019 at 5:39 PM Robert Metzger <
>> rmetz...@apache.org>
>> >> wrote:
>> >>
>> >> > Thank you for reaching out to Infra and the ember client.
>> >> > When I first saw the Ember repository, I thought it is the whole
>> >> thing
>> >> > (frontend and backend), but while testing it, I realized it is
>> >> "only" the
>> >> > frontend. I'm not sure if it makes sense to adjust the Ember
>> >> observer
>> >> > client, or just write a simple UI from scratch.
>> >> > I have a very capable and motivated frontend developer who would
>> be
>> >> > willing to implement what I've mocked in my proposal.
>> >> > In addition, I found somebody (Congxian Qiu) who seems to be
>> eager
>> >> to help
>> >> > with this project for the backend:
>> >> > https://github.com/rmetzger/flink-community-tools/issues/4
>> >> >
>> >> > For Infra: I made the same experience when asking for more GitHub
>> >> > permissions for "flinkbot": They didn't respond on their mailing
>> >> list, only
>> >> > on Jira.
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Mar 14, 2019 at 2:45 PM Becket Qin > >
>> >> wrote:
>> >> >
>> >> >> Thanks for writing up the specifications.
>> >> >>
>> >> >> Regarding the website source code, Austin found a website[1]
>> whose
>> >> >> frontend code[2] is available publicly. It lacks some support
>> (e.g
>> >> login),
>> >> >> but it is still a good starting point. One thing is that I did
>> not
>> >> find a
>> >> >> License statement for that source code. I'll reach out to the
>> >> author to see
>> 

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-21 Thread Becket Qin
Thanks for the update Robert! Looking forward to the prototype!

On Thu, Mar 21, 2019 at 10:07 PM Robert Metzger  wrote:

> Quick summary of our call:
> Daryl will soon start with a front end, build against a very simple
> mock-backend.
> Congxian will start implementing the Spring-based backend early April.
>
> As soon as the first prototype of the UI is ready, we'll share it here for
> feedback.
>
> On Thu, Mar 21, 2019 at 10:08 AM Robert Metzger 
> wrote:
>
> > Okay, great.
> >
> > Congxian Qiu, Daryl and I have a kick-off call later today at 2pm CET,
> 9pm
> > China time about the design of the ecosystem page (see:
> > https://github.com/rmetzger/flink-community-tools/issues/4)
> > Please let me know if others want to join as well, I can add them to the
> > invite.
> >
> > On Wed, Mar 20, 2019 at 4:10 AM Becket Qin  wrote:
> >
> >> I agree. We can start with english-only and see how it goes. The
> comments
> >> and descriptions can always be multi-lingual but that is up to the
> package
> >> owners.
> >>
> >> On Tue, Mar 19, 2019 at 6:07 PM Robert Metzger 
> >> wrote:
> >>
> >>> Thanks.
> >>>
> >>> Do we actually want this page to be multi-language?
> >>>
> >>> I propose to make the website english-only, but maybe consider allowing
> >>> comments in different languages.
> >>> If we would make it multi-language, then we might have problems with
> >>> people submitting packages in non-english languages.
> >>>
> >>>
> >>>
> >>> On Tue, Mar 19, 2019 at 2:42 AM Becket Qin 
> wrote:
> >>>
>  Done. The writeup looks great!
> 
>  On Mon, Mar 18, 2019 at 9:09 PM Robert Metzger 
>  wrote:
> 
> > Nice, really good news on the INFRA front!
> > I think the hardware specs sound reasonable. And a periodic backup of
> > the website's database to Infra's backup solution sounds reasonable
> too.
> >
> > Can you accept and review my proposal for the website?
> >
> >
> > On Sat, Mar 16, 2019 at 3:47 PM Becket Qin 
> > wrote:
> >
> >> >
> >> > I have a very capable and motivated frontend developer who would
> be
> >> > willing to implement what I've mocked in my proposal.
> >>
> >>
> >> That is awesome!
> >>
> >> I created a Jira ticket[1] to Apache Infra and got the reply. It
> >> looks that
> >> Apache infra team could provide a decent VM. The last piece is how
> to
> >> ensure the data is persisted so we won't lose the project info /
> user
> >> feedbacks when the VM is down. If Apache infra does not provide a
> >> persistent storage for DB backup, we can always ask for multiple VMs
> >> and do
> >> the fault tolerance by ourselves. It seems we can almost say the
> >> hardware
> >> side is also ready.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> [1] https://issues.apache.org/jira/browse/INFRA-18010
> >>
> >> On Fri, Mar 15, 2019 at 5:39 PM Robert Metzger  >
> >> wrote:
> >>
> >> > Thank you for reaching out to Infra and the ember client.
> >> > When I first saw the Ember repository, I thought it is the whole
> >> thing
> >> > (frontend and backend), but while testing it, I realized it is
> >> "only" the
> >> > frontend. I'm not sure if it makes sense to adjust the Ember
> >> observer
> >> > client, or just write a simple UI from scratch.
> >> > I have a very capable and motivated frontend developer who would
> be
> >> > willing to implement what I've mocked in my proposal.
> >> > In addition, I found somebody (Congxian Qiu) who seems to be eager
> >> to help
> >> > with this project for the backend:
> >> > https://github.com/rmetzger/flink-community-tools/issues/4
> >> >
> >> > For Infra: I made the same experience when asking for more GitHub
> >> > permissions for "flinkbot": They didn't respond on their mailing
> >> list, only
> >> > on Jira.
> >> >
> >> >
> >> >
> >> > On Thu, Mar 14, 2019 at 2:45 PM Becket Qin 
> >> wrote:
> >> >
> >> >> Thanks for writing up the specifications.
> >> >>
> >> >> Regarding the website source code, Austin found a website[1]
> whose
> >> >> frontend code[2] is available publicly. It lacks some support
> (e.g
> >> login),
> >> >> but it is still a good starting point. One thing is that I did
> not
> >> find a
> >> >> License statement for that source code. I'll reach out to the
> >> author to see
> >> >> if they have any concern over our usage.
> >> >>
> >> >> Apache Infra has not replied to my email regarding some details
> >> about the
> >> >> VM. I'll open an infra Jira ticket tomorrow if there is still no
> >> response.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> [1] https://emberobserver.com/
> >> >> [2] https://github.com/emberobserver/client
> >> >>
> >> >>
> >> >>
> 

[ANNOUNCE] Release 1.8.0, release candidate #4

2019-03-21 Thread Aljoscha Krettek
Hi All,

Voting on RC 4 for Flink 1.8.0 has started: 
https://lists.apache.org/thread.html/9a2150090430e4c10c466775400c0196fe474055dab6b9ab9226960b@%3Cdev.flink.apache.org%3E.

Please check this out if you want to verify your applications against this new 
Flink release.

Best,
Aljoscha

Flink and sketches

2019-03-21 Thread Flavio Pompermaier
Hi to all,
I was looking for an approx_count and freq_item in Flink and I'm not sure
which road to follow.
At the moment I found 2 valuable options:

   1. Wait for STREAMLINE to unveil their code of HLL_DISTINCT_COUNT[1]
   2. Use the Yahoo Datasketches lib [2], following the example of Tobias
   Lindener [3][4] (and maybe release a better and reusable third party lib
   for Flink)

What do you advice about it? Is there any other ongoing effort on approx
statistics?

Best,
Flavio

[1]
https://h2020-streamline-project.eu/wp-content/uploads/2018/10/Streamline-D5.5-Final.pdf
[2] https://datasketches.github.io
[3]https://github.com/tlindener/ApproximateQueries/
[4]
https://www.slideshare.net/SeattleApacheFlinkMeetup/approximate-queries-and-graph-streams-on-apache-flink-theodore-vasiloudis-seattle-apache-flink-meetup


Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

2019-03-21 Thread Rafi Aroch
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in *BulkPartWriter* I
do reach the *write* methods and see events getting written, but I never
reach the *closeForCommit*. I reach straight to the *close* function where
all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s).
Doing some windowed aggregation and writing back to a parquet file.
As far as I know, it should commit files during checkpoints and when the
stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas  wrote:

> Hi Rafi,
>
> Piotr is correct. In-progress files are not necessarily readable.
> The valid files are the ones that are "committed" or finalized.
>
> Cheers,
> Kostas
>
> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure, but shouldn’t you be just reading committed files and
>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>
>> Piotr Nowojski
>>
>> On 20 Mar 2019, at 12:23, Rafi Aroch  wrote:
>>
>> Hi,
>>
>> I'm trying to stream events in Prorobuf format into a parquet file.
>> I looked into both streaming-file options: BucketingSink &
>> StreamingFileSink.
>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>> *API. I noticed there's currently support only for the Avro format with
>> the *ParquetAvroWriters*.
>> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
>> builder
>> class:
>>
>> public class ParquetProtoWriters {
>>
>> private static final int pageSize = 64 * 1024;
>>
>> public static  ParquetWriterFactory forType(final 
>> Class protoClass) {
>> final ParquetBuilder builder = (out) -> 
>> createProtoParquetWriter(protoClass, out);
>> return new ParquetWriterFactory<>(builder);
>> }
>>
>> private static  ParquetWriter 
>> createProtoParquetWriter(
>> Class type,
>> OutputFile out) throws IOException {
>>
>> return ProtoParquetWriter.builder(out)
>> .withPageSize(pageSize)
>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>> .withCompressionCodec(CompressionCodecName.SNAPPY)
>> .withProtoClass(type)
>> .build();
>> }
>> }
>>
>> And then I use it as follows:
>>
>> StreamingFileSink
>> .forBulkFormat(new Path("some-path), 
>> ParquetProtoWriters.forType(SomeProtoType.class))
>> .build();
>>
>> I ran tests on the *ParquetProtoWriters *itself and it writes everything
>> properly and i'm able to read the files.
>>
>> When I use the sink as part of a job *I see illegal Parquet files
>> created*:
>>
>> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>> file (too small length: 4)
>>
>>
>> Can anyone suggest what am I missing here?
>>
>> When trying to use the *BucketingSink*, I wrote a Writer class for
>> Protobuf and everything worked perfectly:
>>
>> public class FlinkProtoParquetWriter implements 
>> Writer {
>>
>> private static final long serialVersionUID = -975302556515811398L;
>>
>> private Path path;
>> private Class protoClass;
>> private transient ParquetWriter writer;
>>
>> private int position;
>> private final CompressionCodecName compressionCodecName = 
>> CompressionCodecName.SNAPPY;
>> private final int pageSize = 64 * 1024;
>>
>> public FlinkProtoParquetWriter(Class protoClass) {
>> this.protoClass = protoClass;
>> }
>>
>> @Override
>> public void open(FileSystem fs, Path path) throws IOException {
>> this.position = 0;
>> this.path = path;
>>
>> if (writer != null) {
>> writer.close();
>> }
>>
>> writer = createWriter();
>> }
>>
>> @Override
>> public long flush() throws IOException {
>> Preconditions.checkNotNull(writer);
>> position += writer.getDataSize();
>> writer.close();
>> writer = createWriter();
>>
>> return position;
>> }
>>
>> @Override
>> public long getPos() {
>> Preconditions.checkNotNull(writer);
>> return position + writer.getDataSize();
>> }
>>
>> @Override
>> public void close() throws IOException {
>> if (writer != null) {
>> writer.close();
>> writer = null;
>> }
>> }
>>
>> @Override
>> public void write(T element) throws IOException {
>> Preconditions.checkNotNull(writer);
>> writer.write(element);
>> }
>>
>> @Override
>> public Writer duplicate() {
>> 

Re: [REMINDER] Flink Forward San Francisco in a few days

2019-03-21 Thread Robert Metzger
I would like to add that the organizers of the conference have agreed to
offer all Apache committers (of any Apache project) a free ticket.
*To get your free ticket, use the "ASFCommitters19” promo code AND use your
@apache.org  email when registering.*

Feel free to reach out to me directly if you have any questions!

I'm looking forward to see as many Apache committers as possible at the
conference, to discuss potential inter-project collaboration, learn from
each other, ...




On Wed, Mar 20, 2019 at 11:03 AM Fabian Hueske  wrote:

> Hi everyone,
>
> *Flink Forward San Francisco 2019 will take place in a few days on April
> 1st and 2nd.*
> If you haven't done so already and are planning to attend, you should
> register soon at:
>
> -> https://sf-2019.flink-forward.org/register
>
> Don't forget to use the 25% discount code *MailingList* for mailing list
> subscribers.
>
> If you are still undecided, check out the conference program [1] of
> exciting talks by speakers from Airbnb, Google, Lyft, Netflix, Splunk,
> Streamlio, Uber, Yelp, and Alibaba.
>
> Hope to see you there,
> Fabian
>
> [1] https://sf-2019.flink-forward.org/conference-program
>


Re: Async Function Not Generating Backpressure

2019-03-21 Thread Ken Krugler

> On Mar 20, 2019, at 6:49 PM, Seed Zeng  wrote:
> 
> Hey Andrey and Ken,
> Sorry about the late reply. I might not have been clear in my question
> The performance of writing to Cassandra is the same in both cases, only that 
> the source rate was higher in the case of the async function is present. 

OK, I was confused by what you’d originally written...

>>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>>> eventually slows down the source rate to 6.5k/s. 
>>> Job 2 is slightly backpressured but was able to run at 14k/s.

If the source rate is _temporarily_ higher, then that maybe makes sense, as the 
async function will be able to buffer up to the configured capacity.

E.g. in the documentation example 


AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);

The capacity is 100 (which is also the default, if you don’t specify it)

> Something is "buffering" and not propagating backpressure to slow down the 
> source speed from Kafka.
> 
> In our use case, we prefer the backpressure to slow down the source so that 
> the write to Cassandra is not delayed while the source is consuming fast.

You can use a smaller capacity to reduce the impact, but that could obviously 
impact the performance whatever your using the async function to parallelize.

Regards,

— Ken

> On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin  > wrote:
> Hi Seed,
> 
> Sorry for confusion, I see now it is separate. Back pressure should still be 
> created because internal async queue has capacity 
> but not sure about reporting problem, Ken and Till probably have better idea.
> 
> As for consumption speed up, async operator creates another thread to collect 
> the result and Cassandra sink probably uses that thread to write data.
> This might parallelize and pipeline previous steps like Kafka fetching and 
> Cassandra IO but I am also not sure about this explanation.
> 
> Best,
> Andrey
> 
> 
> On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler  > wrote:
> Hi Seed,
> 
> I was assuming the Cassandra sink was separate from and after your async 
> function.
> 
> I was trying to come up for an explanation as to why adding the async 
> function would improve your performance.
> 
> The only very unlikely reason I thought of was that the async function 
> somehow caused data arriving at the sink to be more “batchy”, which (if the 
> Cassandra sink had an “every x seconds do a write” batch mode) could improve 
> performance.
> 
> — Ken
> 
>> On Mar 19, 2019, at 11:35 AM, Seed Zeng > > wrote:
>> 
>> Hi Ken and Andrey,
>> 
>> Thanks for the response. I think there is a confusion that the writes to 
>> Cassandra are happening within the Async function. 
>> In my test, the async function is just a pass-through without doing any work.
>> 
>> So any Cassandra related batching or buffering should not be the cause for 
>> this.
>> 
>> Thanks,
>> 
>> Seed
>> 
>> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler > > wrote:
>> Hi Seed,
>> 
>> It’s a known issue that Flink doesn’t report back pressure properly for 
>> AsyncFunctions, due to how it monitors the output collector to gather back 
>> pressure statistics.
>> 
>> But that wouldn’t explain how you get a faster processing with the 
>> AsyncFunction inserted into your workflow.
>> 
>> I haven’t looked at how the Cassandra sink handles batching, but if the 
>> AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then 
>> that’s one (serious hand waving) explanation.
>> 
>> — Ken
>> 
>>> On Mar 18, 2019, at 7:48 PM, Seed Zeng >> > wrote:
>>> 
>>> Flink Version - 1.6.1
>>> 
>>> In our application, we consume from Kafka and sink to Cassandra in the end. 
>>> We are trying to introduce a custom async function in front of the Sink to 
>>> carry out some customized operations. In our testing, it appears that the 
>>> Async function is not generating backpressure to slow down our Kafka Source 
>>> when Cassandra becomes unhappy. Essentially compared to an almost identical 
>>> job where the only difference is the lack of the Async function, Kafka 
>>> source consumption speed is much higher under the same settings and 
>>> identical Cassandra cluster. The experiment is like this.
>>> 
>>> Job 1 - without async function in front of Cassandra
>>> Job 2 - with async function in front of Cassandra
>>> 
>>> Job 1 is backpressured because Cassandra cannot handle all the writes and 
>>> eventually slows down the source rate to 6.5k/s. 
>>> Job 2 is slightly backpressured but was able to run at 14k/s.
>>> 
>>> Is the AsyncFunction somehow not reporting the backpressure correctly?
>>> 
>>> Thanks,
>>> Seed
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-21 Thread Bruno Aranda
Ok, here it goes:

https://transfer.sh/12qMre/jobmanager-debug.log

In an attempt to make it smaller, did remove the noisy "http wire" ones and
masked a couple of things. Not sure this covers everything you would like
to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann  wrote:

> Hi Bruno,
>
> could you upload the logs to https://transfer.sh/ or
> https://gist.github.com/ and then post a link. For further debugging this
> will be crucial. It would be really good if you could set the log level to
> DEBUG.
>
> Concerning the number of registered TMs, the new mode (not the legacy
> mode), no longer respects the `-n` setting when you start a yarn session.
> Instead it will dynamically start as many containers as you need to run the
> submitted jobs. That's why you don't see the spare TM and this is the
> expected behaviour.
>
> The community intends to add support for ranges of how many TMs must be
> active at any given time [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-11078
>
> Cheers,
> Till
>
> On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda  wrote:
>
>> Hi Andrey,
>>
>> Thanks for your response. I was trying to get the logs somewhere but they
>> are biggish (~4Mb). Do you suggest somewhere I could put them?
>>
>> In any case, I can see exceptions like this:
>>
>> 2019/03/18 10:11:50,763 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Releasing
>> slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
>> 2019/03/18 10:11:50,807 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched
>> from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 2, slots allocated: 0
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> ...
>>
>> It looks like a TM may crash, and then the JM. And then the JM is not
>> able to find slots for the tasks in a reasonable time frame? Weirdly, we
>> are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we
>> always try to keep an extra TM worth of free slots just in case. Looking at
>> the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are
>> available when we start the session in yarn.
>>
>> Any ideas? It is way less stable for us these days without having changed
>> settings much since we started using Flink around 1.2 some time back.
>>
>> Thanks,
>>
>> Bruno
>>
>>
>>
>> On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin 
>> wrote:
>>
>>> Hi Bruno,
>>>
>>> could you also share the job master logs?
>>>
>>> Thanks,
>>> Andrey
>>>
>>> On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda 
>>> wrote:
>>>
 Hi,

 This is causing serious instability and data loss in our production
 environment. Any help figuring out what's going on here would be really
 appreciated.

 We recently updated our two EMR clusters from flink 1.6.1 to flink
 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but
 we felt like it was working sufficiently well in our pre-production
 environments that we rolled it out to prod.

 However we're now seeing the jobmanager crash spontaneously several
 times a day. There doesn't seem to be any pattern to when this happens - it
 doesn't coincide with an increase in the data flowing through the system,
 nor is it at the same time of day.

 The big problem is that when it recovers, sometimes a lot of the jobs
 fail to resume with the following exception:

 org.apache.flink.util.FlinkException: JobManager responsible for
 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
 //...
 Caused by: java.util.concurrent.TimeoutException: The heartbeat of
 JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
 ... 16 more

 Starting them manually afterwards doesn't resume from checkpoint, which
 for most jobs means it starts from the end of the source kafka topic. This
 means whenever this surprise jobmanager restart happens, we have a ticking
 clock during which we're losing data.

 We speculate that those jobs die first and while they wait to 

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

2019-03-21 Thread Kostas Kloudas
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure, but shouldn’t you be just reading committed files and ignore
> in-progress? Maybe Kostas could add more insight to this topic.
>
> Piotr Nowojski
>
> On 20 Mar 2019, at 12:23, Rafi Aroch  wrote:
>
> Hi,
>
> I'm trying to stream events in Prorobuf format into a parquet file.
> I looked into both streaming-file options: BucketingSink &
> StreamingFileSink.
> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
> *API. I noticed there's currently support only for the Avro format with
> the *ParquetAvroWriters*.
> I followed the same convention as Avro and wrote a *ParquetProtoWriters* 
> builder
> class:
>
> public class ParquetProtoWriters {
>
> private static final int pageSize = 64 * 1024;
>
> public static  ParquetWriterFactory forType(final 
> Class protoClass) {
> final ParquetBuilder builder = (out) -> 
> createProtoParquetWriter(protoClass, out);
> return new ParquetWriterFactory<>(builder);
> }
>
> private static  ParquetWriter 
> createProtoParquetWriter(
> Class type,
> OutputFile out) throws IOException {
>
> return ProtoParquetWriter.builder(out)
> .withPageSize(pageSize)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .withCompressionCodec(CompressionCodecName.SNAPPY)
> .withProtoClass(type)
> .build();
> }
> }
>
> And then I use it as follows:
>
> StreamingFileSink
> .forBulkFormat(new Path("some-path), 
> ParquetProtoWriters.forType(SomeProtoType.class))
> .build();
>
> I ran tests on the *ParquetProtoWriters *itself and it writes everything
> properly and i'm able to read the files.
>
> When I use the sink as part of a job *I see illegal Parquet files created*
> :
>
> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
> file (too small length: 4)
>
>
> Can anyone suggest what am I missing here?
>
> When trying to use the *BucketingSink*, I wrote a Writer class for
> Protobuf and everything worked perfectly:
>
> public class FlinkProtoParquetWriter implements 
> Writer {
>
> private static final long serialVersionUID = -975302556515811398L;
>
> private Path path;
> private Class protoClass;
> private transient ParquetWriter writer;
>
> private int position;
> private final CompressionCodecName compressionCodecName = 
> CompressionCodecName.SNAPPY;
> private final int pageSize = 64 * 1024;
>
> public FlinkProtoParquetWriter(Class protoClass) {
> this.protoClass = protoClass;
> }
>
> @Override
> public void open(FileSystem fs, Path path) throws IOException {
> this.position = 0;
> this.path = path;
>
> if (writer != null) {
> writer.close();
> }
>
> writer = createWriter();
> }
>
> @Override
> public long flush() throws IOException {
> Preconditions.checkNotNull(writer);
> position += writer.getDataSize();
> writer.close();
> writer = createWriter();
>
> return position;
> }
>
> @Override
> public long getPos() {
> Preconditions.checkNotNull(writer);
> return position + writer.getDataSize();
> }
>
> @Override
> public void close() throws IOException {
> if (writer != null) {
> writer.close();
> writer = null;
> }
> }
>
> @Override
> public void write(T element) throws IOException {
> Preconditions.checkNotNull(writer);
> writer.write(element);
> }
>
> @Override
> public Writer duplicate() {
> return new FlinkProtoParquetWriter<>(protoClass);
> }
>
> private ParquetWriter createWriter() throws IOException {
> return ProtoParquetWriter
> .builder(path)
> .withPageSize(pageSize)
> .withCompressionCodec(compressionCodecName)
> .withProtoClass(protoClass)
> .build();
> }
> }
>
>
> Rafi
>
>
>


Avro state migration using Scala in Flink 1.7.2 (and 1.8)

2019-03-21 Thread Marc Rooding
Hi

I’ve been trying to get state migration with Avro working on Flink 1.7.2 using 
Scala case classes but I’m not getting anywhere closer to solving it.

We’re using the most basic streaming WordCount example as a reference to test 
the schema evolution:

val wordCountStream: DataStream[WordWithCount] = text
 .flatMap { w => w.split("\\s") }
 .map { w => WordWithCount(w, 1) }
 .keyBy(_.word)
 .reduce((a, b) => WordWithCount(a.word, a.count + b.count))

In this example, WordWithCount is our data object that we’d like to have 
serialized and deserialized with schema evolution support since keyBy maintains 
state.

I understood from the documentation that it would only work for classes 
generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate our 
case classes. However, for normal case classes generated by Avro we quickly ran 
into the problem that we needed a no-arg constructor.

We looked at the flink-avro module and noticed that the classes generated by 
the avro-maven-plugin were implementing SpecificRecord and seemed to comply 
with the POJO rules as described in the Flink documentation. After switching 
from normal to specific avro generation with sbt-avrohugger, we ended up with 
Scala case classes that should comply with all rules.

An example of such a generated case class is as follows:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch

case class WordWithCount(var word: String, var count: Long) extends 
org.apache.avro.specific.SpecificRecordBase {
def this() = this("", 0L)
def get(field$: Int): AnyRef = {
   (field$: @switch) match {
 case 0 => {
   word
 }.asInstanceOf[AnyRef]
 case 1 => {
   count
 }.asInstanceOf[AnyRef]
 case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
 }
def put(field$: Int, value: Any): Unit = {
   (field$: @switch) match {
 case 0 => this.word = {
   value.toString
 }.asInstanceOf[String]
 case 1 => this.count = {
   value
 }.asInstanceOf[Long]
 case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
   ()
 }
def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}

object WordWithCount {
val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}")
}

This, however, also didn’t work out of the box. We then tried to define our own 
type information using flink-avro’s AvroTypeInfo but this fails because Avro 
looks for a SCHEMA$ property (SpecificData:285) in the class and is unable to 
use Java reflection to identify the SCHEMA$ in the Scala companion object.
implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new 
AvroTypeInfo(classOf[WordWithCount])
We then read in the 1.7 documentation that Flink doesn’t natively support POJO 
types, but only state defined by descriptors, like f.e. the 
ListStateDescriptor, and only if you allow Flink to infer the type information. 
This is definitely what we need for our processors that have map and list 
state. However, for the simple word count example, we should only need native 
POJO (de)serialization with state migration.

We then noticed Github PR #7759 that adds support for POJO state schema 
evolution/migration. We wanted to give this a try and built flink from source 
from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars in our job 
and got a local 1.8 cluster and job running fine.

However, if we do not specify our own type information, and perform the 
following steps:


1. Run the job
2. Create a savepoint and stop the job
3. Update the WordWithCount avro schema to include a third field
4. Update the job according to the generated case class
5. Run the new job from the savepoint


We are then faced with the following error:

Caused by: java.lang.IllegalArgumentException: array is not of length 3 thrown 
from ScalaCaseClassSerializer.scala:50

However, if we again try to define our own type information using the 
AvroTypeInfo class, we are faced with the same issue as in 1.7.

What are we missing? The documentation on how to use this is very limited, and 
we’re getting the idea that it may work with Java types, but maybe not with 
Scala case classes. I’d love to hear some pointers on how to approach this? 
Compared to our solution in 1.4 
(https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399),
 we hoped to get rid of all the custom serializers by moving to 1.7

Thanks in advance!

Marc


Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-21 Thread Till Rohrmann
Hi Bruno,

could you upload the logs to https://transfer.sh/ or
https://gist.github.com/ and then post a link. For further debugging this
will be crucial. It would be really good if you could set the log level to
DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy
mode), no longer respects the `-n` setting when you start a yarn session.
Instead it will dynamically start as many containers as you need to run the
submitted jobs. That's why you don't see the spare TM and this is the
expected behaviour.

The community intends to add support for ranges of how many TMs must be
active at any given time [1].

[1] https://issues.apache.org/jira/browse/FLINK-11078

Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda  wrote:

> Hi Andrey,
>
> Thanks for your response. I was trying to get the logs somewhere but they
> are biggish (~4Mb). Do you suggest somewhere I could put them?
>
> In any case, I can see exceptions like this:
>
> 2019/03/18 10:11:50,763 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Releasing
> slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
> 2019/03/18 10:11:50,807 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched
> from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 2, slots allocated: 0
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> ...
>
> It looks like a TM may crash, and then the JM. And then the JM is not able
> to find slots for the tasks in a reasonable time frame? Weirdly, we are
> running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we
> always try to keep an extra TM worth of free slots just in case. Looking at
> the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are
> available when we start the session in yarn.
>
> Any ideas? It is way less stable for us these days without having changed
> settings much since we started using Flink around 1.2 some time back.
>
> Thanks,
>
> Bruno
>
>
>
> On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin 
> wrote:
>
>> Hi Bruno,
>>
>> could you also share the job master logs?
>>
>> Thanks,
>> Andrey
>>
>> On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda  wrote:
>>
>>> Hi,
>>>
>>> This is causing serious instability and data loss in our production
>>> environment. Any help figuring out what's going on here would be really
>>> appreciated.
>>>
>>> We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2
>>> (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt
>>> like it was working sufficiently well in our pre-production environments
>>> that we rolled it out to prod.
>>>
>>> However we're now seeing the jobmanager crash spontaneously several
>>> times a day. There doesn't seem to be any pattern to when this happens - it
>>> doesn't coincide with an increase in the data flowing through the system,
>>> nor is it at the same time of day.
>>>
>>> The big problem is that when it recovers, sometimes a lot of the jobs
>>> fail to resume with the following exception:
>>>
>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>> 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
>>> //...
>>> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
>>> JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
>>> ... 16 more
>>>
>>> Starting them manually afterwards doesn't resume from checkpoint, which
>>> for most jobs means it starts from the end of the source kafka topic. This
>>> means whenever this surprise jobmanager restart happens, we have a ticking
>>> clock during which we're losing data.
>>>
>>> We speculate that those jobs die first and while they wait to be
>>> restarted (they have a 30 second delay strategy), the job manager restarts
>>> and does not recover them? In any case, we have never seen so many job
>>> failures and JM restarts with exactly the same EMR config.
>>>
>>> We've got some functionality we're building that uses the
>>> StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal
>>> option.
>>>
>>> Looking through the mailing list, we found
>

Re: Ambiguous behavior of Flink on Job cancellation with checkpoint configured

2019-03-21 Thread Aljoscha Krettek
Hi,

That’s a good observation! And it is indeed the expected behaviour. There are 
two parts to understanding this:
 * "retain checkpoints” tells Flink to retain any checkpoints that it stores 
when a job is shut down
 * for recovery purposes (all checkpointing purposes, really) a savepoint 
counts as a checkpoint. Otherwise, you can have strange behaviour when you do a 
successful savepoint, then a failure occurs, and you would then restore from 
the checkpoint before that. See especially [1] about this.

So when you configure Flink to retain only one checkpoint this means that when 
you do a savepoint it will count as that one checkpoint, i.e. the previous 
checkpoint is discarded. Thus, you only have that savepoint after the job is 
canceled.

I hope this makes sense. Let me know if you have any questions.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-10354

> On 21. Mar 2019, at 08:58, Parth Sarathy  wrote:
> 
> Hi All,
>   We are using flink 1.7.2 and have enabled checkpoint with
> RocksDB configured as state backend with retain checkpoints on job cancel.
> In our scenario we are cancelling the job and while resubmitting the job, we
> try to restore the job with latest checkpoint / savepoint available. We are
> observing ambiguous behavior based on the way job is being cancelled, below
> are the captured observations:
> 
> Observations :
> 1. When we cancel the job with a savepoint option, a savepoint is created as
> expected but flink is deleting the latest checkpoint directory available for
> the running job. Is this an expected behavior even when the configuration
> asks to retain checkpoints on job cancellation?
> 2. When we cancel the job without the savepoint option, the same latest
> checkpoint was retained by flink as opposed to before where it was deleted
> as job was cancelled with the savepoint option.
> 
>   As we have configured flink to retain only a single
> checkpoint at any point of time, could there be any issue wherein when we
> cancel the job with a savepoint, the savepoint gets triggered but fails
> midway. So now we would end up with an incomplete savepoint and no trace of
> checkpoint for the job as it would have been erased.
> 
> Thanks
> Parth Sarathy
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Best practice to handle update messages in stream

2019-03-21 Thread Piotr Nowojski
Hi,

There is an ongoing work [1] to support natively the streams like you described 
(we call them upsert streams/changelogs). But it boils down to the exactly the 
same thing you have done - aggregating the records per key and adding `latest` 
aggregation function. Until we support this natively, you can use the query 
that you have written.

Regarding the state size. In most cases there is no workaround this issue. 
Records overwriting previous value could arrive at arbitrary point of time and 
for most of the operations (like SUM aggregation in your case, filtering) we 
need to keep the previous value for the key on the state. Sometimes it might be 
possible to optimise the query and skip the “latest value aggregation”, if the 
following SQL operator either do not need to know the previous value (like sink 
or projection) or if the following SQL operator knows the previous value anyway 
(like join).

Piotr Nowojski

[1] https://issues.apache.org/jira/browse/FLINK-8545 


> On 21 Mar 2019, at 09:39, 徐涛  wrote:
> 
> Hi Experts,
>   Assuming there is a stream which content is like this:
>Seq ID MONEY
>   1.100   100
>2.100   200
>3.101   300
> 
>   The record of Seq#2 is updating record of Seq#1, changing the money 
> from 100 to 200.
>   If I register the stream as table T, and want to sum all the money 
> group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the 
> result, which is incorrect.
> 
>   I can write a UDAF, for example latest, to compute the latest value of 
> all the ID, then the SQL is like this:
>   select sum(MONEY) from
>   (
>   select ID, latest(MONEY) from T group by ID
>   )
>   But I have to save each ID and its latest value in state, I am worried 
> that the state goes too large. Now I use this method and set the state 
> retention to several days before the state goes too large. I wonder if there 
> are better ways to do this.
> 
>   So what is the best practice in this scenario? Anyone have a 
> suggestion? Thanks a lot.
> 
> 
> Best
> Henry
>   



Re: [DISCUSS] Create a Flink ecosystem website

2019-03-21 Thread Robert Metzger
Quick summary of our call:
Daryl will soon start with a front end, build against a very simple
mock-backend.
Congxian will start implementing the Spring-based backend early April.

As soon as the first prototype of the UI is ready, we'll share it here for
feedback.

On Thu, Mar 21, 2019 at 10:08 AM Robert Metzger  wrote:

> Okay, great.
>
> Congxian Qiu, Daryl and I have a kick-off call later today at 2pm CET, 9pm
> China time about the design of the ecosystem page (see:
> https://github.com/rmetzger/flink-community-tools/issues/4)
> Please let me know if others want to join as well, I can add them to the
> invite.
>
> On Wed, Mar 20, 2019 at 4:10 AM Becket Qin  wrote:
>
>> I agree. We can start with english-only and see how it goes. The comments
>> and descriptions can always be multi-lingual but that is up to the package
>> owners.
>>
>> On Tue, Mar 19, 2019 at 6:07 PM Robert Metzger 
>> wrote:
>>
>>> Thanks.
>>>
>>> Do we actually want this page to be multi-language?
>>>
>>> I propose to make the website english-only, but maybe consider allowing
>>> comments in different languages.
>>> If we would make it multi-language, then we might have problems with
>>> people submitting packages in non-english languages.
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 2:42 AM Becket Qin  wrote:
>>>
 Done. The writeup looks great!

 On Mon, Mar 18, 2019 at 9:09 PM Robert Metzger 
 wrote:

> Nice, really good news on the INFRA front!
> I think the hardware specs sound reasonable. And a periodic backup of
> the website's database to Infra's backup solution sounds reasonable too.
>
> Can you accept and review my proposal for the website?
>
>
> On Sat, Mar 16, 2019 at 3:47 PM Becket Qin 
> wrote:
>
>> >
>> > I have a very capable and motivated frontend developer who would be
>> > willing to implement what I've mocked in my proposal.
>>
>>
>> That is awesome!
>>
>> I created a Jira ticket[1] to Apache Infra and got the reply. It
>> looks that
>> Apache infra team could provide a decent VM. The last piece is how to
>> ensure the data is persisted so we won't lose the project info / user
>> feedbacks when the VM is down. If Apache infra does not provide a
>> persistent storage for DB backup, we can always ask for multiple VMs
>> and do
>> the fault tolerance by ourselves. It seems we can almost say the
>> hardware
>> side is also ready.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> [1] https://issues.apache.org/jira/browse/INFRA-18010
>>
>> On Fri, Mar 15, 2019 at 5:39 PM Robert Metzger 
>> wrote:
>>
>> > Thank you for reaching out to Infra and the ember client.
>> > When I first saw the Ember repository, I thought it is the whole
>> thing
>> > (frontend and backend), but while testing it, I realized it is
>> "only" the
>> > frontend. I'm not sure if it makes sense to adjust the Ember
>> observer
>> > client, or just write a simple UI from scratch.
>> > I have a very capable and motivated frontend developer who would be
>> > willing to implement what I've mocked in my proposal.
>> > In addition, I found somebody (Congxian Qiu) who seems to be eager
>> to help
>> > with this project for the backend:
>> > https://github.com/rmetzger/flink-community-tools/issues/4
>> >
>> > For Infra: I made the same experience when asking for more GitHub
>> > permissions for "flinkbot": They didn't respond on their mailing
>> list, only
>> > on Jira.
>> >
>> >
>> >
>> > On Thu, Mar 14, 2019 at 2:45 PM Becket Qin 
>> wrote:
>> >
>> >> Thanks for writing up the specifications.
>> >>
>> >> Regarding the website source code, Austin found a website[1] whose
>> >> frontend code[2] is available publicly. It lacks some support (e.g
>> login),
>> >> but it is still a good starting point. One thing is that I did not
>> find a
>> >> License statement for that source code. I'll reach out to the
>> author to see
>> >> if they have any concern over our usage.
>> >>
>> >> Apache Infra has not replied to my email regarding some details
>> about the
>> >> VM. I'll open an infra Jira ticket tomorrow if there is still no
>> response.
>> >>
>> >> Thanks,
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> [1] https://emberobserver.com/
>> >> [2] https://github.com/emberobserver/client
>> >>
>> >>
>> >>
>> >> On Thu, Mar 14, 2019 at 1:35 AM Robert Metzger <
>> rmetz...@apache.org>
>> >> wrote:
>> >>
>> >>> @Bowen: I agree. Confluent Hub looks nicer, but it is on their
>> company
>> >>> website. I guess the likelihood that they give out code from
>> their company
>> >>> website is fairly low.
>> >>> @Nils: Beam's page is similar to our Ecosystem page, which we'll
>> >>> 

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

2019-03-21 Thread Piotr Nowojski
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore 
in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

> On 20 Mar 2019, at 12:23, Rafi Aroch  wrote:
> 
> Hi,
> 
> I'm trying to stream events in Prorobuf format into a parquet file.
> I looked into both streaming-file options: BucketingSink & StreamingFileSink.
> I first tried using the newer StreamingFileSink with the forBulkFormat API. I 
> noticed there's currently support only for the Avro format with the 
> ParquetAvroWriters.
> I followed the same convention as Avro and wrote a ParquetProtoWriters 
> builder class:
> 
> public class ParquetProtoWriters {
> 
> private static final int pageSize = 64 * 1024;
> 
> public static  ParquetWriterFactory forType(final 
> Class protoClass) {
> final ParquetBuilder builder = (out) -> 
> createProtoParquetWriter(protoClass, out);
> return new ParquetWriterFactory<>(builder);
> }
> 
> private static  ParquetWriter 
> createProtoParquetWriter(
> Class type,
> OutputFile out) throws IOException {
> 
> return ProtoParquetWriter.builder(out)
> .withPageSize(pageSize)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .withCompressionCodec(CompressionCodecName.SNAPPY)
> .withProtoClass(type)
> .build();
> }
> }
> And then I use it as follows:
> StreamingFileSink
> .forBulkFormat(new Path("some-path), 
> ParquetProtoWriters.forType(SomeProtoType.class))
> .build();
> I ran tests on the ParquetProtoWriters itself and it writes everything 
> properly and i'm able to read the files.
> 
> When I use the sink as part of a job I see illegal Parquet files created:
> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
> file (too small length: 4)
> 
> Can anyone suggest what am I missing here?
> 
> When trying to use the BucketingSink, I wrote a Writer class for Protobuf and 
> everything worked perfectly:
> public class FlinkProtoParquetWriter implements 
> Writer {
> 
> private static final long serialVersionUID = -975302556515811398L;
> 
> private Path path;
> private Class protoClass;
> private transient ParquetWriter writer;
> 
> private int position;
> private final CompressionCodecName compressionCodecName = 
> CompressionCodecName.SNAPPY;
> private final int pageSize = 64 * 1024;
> 
> public FlinkProtoParquetWriter(Class protoClass) {
> this.protoClass = protoClass;
> }
> 
> @Override
> public void open(FileSystem fs, Path path) throws IOException {
> this.position = 0;
> this.path = path;
> 
> if (writer != null) {
> writer.close();
> }
> 
> writer = createWriter();
> }
> 
> @Override
> public long flush() throws IOException {
> Preconditions.checkNotNull(writer);
> position += writer.getDataSize();
> writer.close();
> writer = createWriter();
> 
> return position;
> }
> 
> @Override
> public long getPos() {
> Preconditions.checkNotNull(writer);
> return position + writer.getDataSize();
> }
> 
> @Override
> public void close() throws IOException {
> if (writer != null) {
> writer.close();
> writer = null;
> }
> }
> 
> @Override
> public void write(T element) throws IOException {
> Preconditions.checkNotNull(writer);
> writer.write(element);
> }
> 
> @Override
> public Writer duplicate() {
> return new FlinkProtoParquetWriter<>(protoClass);
> }
> 
> private ParquetWriter createWriter() throws IOException {
> return ProtoParquetWriter
> .builder(path)
> .withPageSize(pageSize)
> .withCompressionCodec(compressionCodecName)
> .withProtoClass(protoClass)
> .build();
> }
> }
> 
> Rafi



Re: [VOTE] Release 1.8.0, release candidate #3

2019-03-21 Thread Yu Li
Thanks for the message Aljoscha, let's discuss in JIRA (just replied there).

Best Regards,
Yu


On Thu, 21 Mar 2019 at 21:15, Aljoscha Krettek  wrote:

> Hi Yu,
>
> I commented on the issue. For me both Hadoop 2.8.3 and Hadoop 2.4.1 seem
> to work. Could you have a look at my comment?
>
> I will also cancel this RC because of various issues.
>
> Best,
> Aljoscha
>
> On 21. Mar 2019, at 12:23, Yu Li  wrote:
>
> Thanks @jincheng
>
> @Aljoscha I've just opened FLINK-11990
>  for the HDFS
> BucketingSink issue with hadoop 2.8. IMHO it might be a blocker for 1.8.0
> and need your confirmation. Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 21 Mar 2019 at 15:57, jincheng sun 
> wrote:
>
>> Thanks for the quick fix, Yu. the PR of FLINK-11972
>>  has been merged.
>>
>> Cheers,
>> Jincheng
>>
>> Yu Li  于2019年3月21日周四 上午7:23写道:
>>
>>> -1, observed stably failure on streaming bucketing end-to-end test case
>>> in two different environments (Linux/MacOS) when running with both shaded
>>> hadoop-2.8.3 jar file
>>> 
>>> and hadoop-2.8.5 dist
>>> , while both
>>> env could pass with hadoop 2.6.5. More details please refer to this
>>> comment
>>> 
>>> in FLINK-11972.
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Thu, 21 Mar 2019 at 04:25, jincheng sun 
>>> wrote:
>>>
 Thanks for the quick fix Aljoscha! The FLINK-11971
  has been merged.

 Cheers,
 Jincheng

 Piotr Nowojski  于2019年3月21日周四 上午12:29写道:

> -1 from my side due to performance regression found in the master
> branch since Jan 29th.
>
> In 10% JVM forks it was causing huge performance drop in some of the
> benchmarks (up to 30-50% reduced throughput), which could mean that one 
> out
> of 10 task managers could be affected by it. Today we have merged a fix 
> for
> it [1]. First benchmark run was promising [2], but we have to wait until
> tomorrow to make sure that the problem was definitely resolved. If that’s
> the case, I would recommend including it in 1.8.0, because we really do 
> not
> know how big of performance regression this issue can be in the real world
> scenarios.
>
> Regarding the second regression from mid February. We have found the
> responsible commit and this one is probably just a false positive. Because
> of the nature some of the benchmarks, they are running with low number of
> records (300k). The apparent performance regression was caused by higher
> initialisation time. When I temporarily increased the number of records to
> 2M, the regression was gone. Together with Till and Stefan Richter we
> discussed the potential impact of this longer initialisation time (in the
> case of said benchmarks initialisation time increased from 70ms to 120ms)
> and we think that it’s not a critical issue, that doesn’t have to block 
> the
> release. Nevertheless there might some follow up work for this.
>
> [1] https://github.com/apache/flink/pull/8020
> [2] http://codespeed.dak8s.net:8000/timeline/?ben=tumblingWindow&env=2
>
> Piotr Nowojski
>
> On 20 Mar 2019, at 10:09, Aljoscha Krettek 
> wrote:
>
> Thanks Jincheng! It would be very good to fix those but as you said, I
> would say they are not blockers.
>
> On 20. Mar 2019, at 09:47, Kurt Young  wrote:
>
> +1 (non-binding)
>
> Checked items:
> - checked checksums and GPG files
> - verified that the source archives do not contains any binaries
> - checked that all POM files point to the same version
> - build from source successfully
>
> Best,
> Kurt
>
>
> On Wed, Mar 20, 2019 at 2:12 PM jincheng sun 
> wrote:
>
>> Hi Aljoscha&All,
>>
>> When I did the `end-to-end` test for RC3 under Mac OS, I found the
>> following two problems:
>>
>> 1. The verification returned for different `minikube status` is is
>> not enough for the robustness. The strings returned by different versions
>> of different platforms are different. the following misjudgment is 
>> caused:
>> When the `Command: start_kubernetes_if_not_ruunning failed` error
>> occurs, the minikube has actually started successfully. The core reason 
>> is
>> that there is a bug in the `test_kubernetes_embedded_job.sh` script. See
>> FLINK-11971  for
>> details.
>

Re: [DISCUSS] Introduction of a Table API Java Expression DSL

2019-03-21 Thread Timo Walther

Thanks for your feedback Rong and Jark.

@Jark: Yes, you are right that the string-based API is used quite a lot. 
On the other side, the potential user base in the future is still bigger 
than our current user base. Because the Table API will become equally 
important as the DataStream API, we really need to fix some crucial 
design decisions before it is too late. I would suggest to introduce the 
new DSL in 1.9 and remove the Expression parser either in 1.10 or 1.11. 
From a developement point of view, I think we can handle the overhead 
to maintain 3 APIs until then because 2 APIs will share the same code 
base + expression parser.


Regards,
Timo

Am 21.03.19 um 05:21 schrieb Jark Wu:

Hi Timo,

I'm +1 on the proposal. I like the idea to provide a Java DSL which is 
more friendly than string-based approach in programming.


My concern is if/when we can drop the string-based expression parser. 
If it takes a very long time, we have to paid more development
cost on the three Table APIs. As far as I know, the string-based API 
is used in many companies.
We should also get some feedbacks from users. So I'm CCing this email 
to user mailing list.


Best,
Jark



On Wed, 20 Mar 2019 at 08:51, Rong Rong > wrote:


Thanks for sharing the initiative of improving Java side Table
expression
DSL.

I agree as in the doc stated that Java DSL was always a "3rd class
citizen"
and we've run into many hand holding scenarios with our Flink
developers
trying to get the Stringify syntax working.
Overall I am a +1 on this, it also help reduce the development
cost of the
Table API so that we no longer need to maintain different DSL and
documentations.

I left a few comments in the doc. and also some features that I
think will
be beneficial to the final outcome. Please kindly take a look @Timo.

Many thanks,
Rong

On Mon, Mar 18, 2019 at 7:15 AM Timo Walther mailto:twal...@apache.org>> wrote:

> Hi everyone,
>
> some of you might have already noticed the JIRA issue that I opened
> recently [1] about introducing a proper Java expression DSL for the
> Table API. Instead of using string-based expressions, we should
aim for
> a unified, maintainable, programmatic Java DSL.
>
> Some background: The Blink merging efforts and the big
refactorings as
> part of FLIP-32 have revealed many shortcomings in the current
Table &
> SQL API design. Most of these legacy issues cause problems
nowadays in
> making the Table API a first-class API next to the DataStream
API. An
> example is the ExpressionParser class[2]. It was implemented in the
> early days of the Table API using Scala parser combinators.
During the
> last years, this parser caused many JIRA issues and user
confusion on
> the mailing list. Because the exceptions and syntax might not be
> straight forward.
>
> For FLINK-11908, we added a temporary bridge instead of
reimplementing
> the parser in Java for FLIP-32. However, this is only a intermediate
> solution until we made a final decision.
>
> I would like to propose a new, parser-free version of the Java
Table API:
>
>
>

https://docs.google.com/document/d/1r3bfR9R6q5Km0wXKcnhfig2XQ4aMiLG5h2MTx960Fg8/edit?usp=sharing
>
> I already implemented an early protoype that shows that such a
DSL is
> not much implementation effort and integrates nicely with all
existing
> API methods.
>
> What do you think?
>
> Thanks for your feedback,
>
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-11890
>
> [2]
>
>

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
>
>





Re: [VOTE] Release 1.8.0, release candidate #3

2019-03-21 Thread Aljoscha Krettek
Hi Yu,

I commented on the issue. For me both Hadoop 2.8.3 and Hadoop 2.4.1 seem to 
work. Could you have a look at my comment?

I will also cancel this RC because of various issues.

Best,
Aljoscha

> On 21. Mar 2019, at 12:23, Yu Li  wrote:
> 
> Thanks @jincheng
> 
> @Aljoscha I've just opened FLINK-11990 
>  for the HDFS 
> BucketingSink issue with hadoop 2.8. IMHO it might be a blocker for 1.8.0 and 
> need your confirmation. Thanks.
> 
> Best Regards,
> Yu
> 
> 
> On Thu, 21 Mar 2019 at 15:57, jincheng sun  > wrote:
> Thanks for the quick fix, Yu. the PR of FLINK-11972 
>  has been merged.
> 
> Cheers,
> Jincheng
> 
> Yu Li mailto:car...@gmail.com>> 于2019年3月21日周四 上午7:23写道:
> -1, observed stably failure on streaming bucketing end-to-end test case in 
> two different environments (Linux/MacOS) when running with both shaded 
> hadoop-2.8.3 jar file 
> 
>  and hadoop-2.8.5 dist 
> , while both env 
> could pass with hadoop 2.6.5. More details please refer to this comment 
> 
>  in FLINK-11972.
> 
> Best Regards,
> Yu
> 
> 
> On Thu, 21 Mar 2019 at 04:25, jincheng sun  > wrote:
> Thanks for the quick fix Aljoscha! The FLINK-11971 
>  has been merged.
> 
> Cheers,
> Jincheng
> 
> Piotr Nowojski mailto:pi...@ververica.com>> 
> 于2019年3月21日周四 上午12:29写道:
> -1 from my side due to performance regression found in the master branch 
> since Jan 29th. 
> 
> In 10% JVM forks it was causing huge performance drop in some of the 
> benchmarks (up to 30-50% reduced throughput), which could mean that one out 
> of 10 task managers could be affected by it. Today we have merged a fix for 
> it [1]. First benchmark run was promising [2], but we have to wait until 
> tomorrow to make sure that the problem was definitely resolved. If that’s the 
> case, I would recommend including it in 1.8.0, because we really do not know 
> how big of performance regression this issue can be in the real world 
> scenarios.
> 
> Regarding the second regression from mid February. We have found the 
> responsible commit and this one is probably just a false positive. Because of 
> the nature some of the benchmarks, they are running with low number of 
> records (300k). The apparent performance regression was caused by higher 
> initialisation time. When I temporarily increased the number of records to 
> 2M, the regression was gone. Together with Till and Stefan Richter we 
> discussed the potential impact of this longer initialisation time (in the 
> case of said benchmarks initialisation time increased from 70ms to 120ms) and 
> we think that it’s not a critical issue, that doesn’t have to block the 
> release. Nevertheless there might some follow up work for this.
> 
> [1] https://github.com/apache/flink/pull/8020 
> 
> [2] http://codespeed.dak8s.net:8000/timeline/?ben=tumblingWindow&env=2 
> 
> 
> Piotr Nowojski
> 
>> On 20 Mar 2019, at 10:09, Aljoscha Krettek > > wrote:
>> 
>> Thanks Jincheng! It would be very good to fix those but as you said, I would 
>> say they are not blockers.
>> 
>>> On 20. Mar 2019, at 09:47, Kurt Young >> > wrote:
>>> 
>>> +1 (non-binding)
>>> 
>>> Checked items:
>>> - checked checksums and GPG files
>>> - verified that the source archives do not contains any binaries
>>> - checked that all POM files point to the same version
>>> - build from source successfully 
>>> 
>>> Best,
>>> Kurt
>>> 
>>> 
>>> On Wed, Mar 20, 2019 at 2:12 PM jincheng sun >> > wrote:
>>> Hi Aljoscha&All,
>>> 
>>> When I did the `end-to-end` test for RC3 under Mac OS, I found the 
>>> following two problems:
>>> 
>>> 1. The verification returned for different `minikube status` is is not 
>>> enough for the robustness. The strings returned by different versions of 
>>> different platforms are different. the following misjudgment is caused:
>>> When the `Command: start_kubernetes_if_not_ruunning failed` error occurs, 
>>> the minikube has actually started successfully. The core reason is that 
>>> there is a bug in the `test_kubernetes_embedded_job.sh` script. See 
>>> FLINK-11971  for details.
>>> 
>>> 2. Since the difference between 1.8.0 and 1.7.x is that 1.8.x does not put 
>>> the `hadoop-shaded` J

Re: StochasticOutlierSelection

2019-03-21 Thread Piotr Nowojski
(Adding back user mailing list)

Hi Anissa,

Thank you for coming back with the results. I hope this might be helpful for 
someone else in the future and maybe it will be one more argument for the Flink 
community to address this issue in some other way.

Piotrek

> On 20 Mar 2019, at 17:26, anissa moussaoui  
> wrote:
> 
> Hi Piotr,
> 
> thank you for your return and sorry for my reply so late.
> I took a little time to look for all the possibilities to use FlinkML in 
> Java, but that's not possible as you told me. After looking in other 
> libraries in Java, I chose the MOA library in combination with flink API for 
> anomaly detection streaming which gives quite satisfactory results.
> 
> Best,
> 
> Anissa 
> 
> 
> 
> Le lun. 4 mars 2019 à 16:08, Piotr Nowojski  > a écrit :
> 
> Hi,
> 
> I have never used this code, but ml library depends heavily on Scala, so I 
> wouldn’t recommend using it with Java. 
> 
> However if you want to go this way (I’m not sure if that’s possible), you 
> would have to pass the implicit parameters manually somehow (I don’t know how 
> to do that from Java). In this case you can take a look at the method’s 
> signature that parameters has a default value and the implicitly passed 
> transformOperation comes from 
> `StochasticOutlierSelection.transformLabeledVectors` or/and (?) 
> `StochasticOutlierSelection.transformVectors`.
> 
> Piotrek
> 
>> On 2 Mar 2019, at 12:21, anissa moussaoui > > wrote:
>> 
>> Hello,
>> 
>> I would like to use the StochasticOutlierSelection algorithm for anomaly 
>> detection in a DataSet in java but the problem the doc is only in scala.
>> 
>> In java doing : 
>> 
>> StochasticOutlierSelection stochasticOutliers= new 
>> StochasticOutlierSelection();
>> 
>> the "tranform" operation of this algorithm take three parameters : training 
>> dataSet, ParameterMap and TransformOperationDataSet on java but in scala doc 
>> take one parameters 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/ml/sos.html#parameters
>>  
>> .
>> It's the same case for other algorithms like KNN un scala take only training 
>> dataset un scala but un java takes three parameters !
>> 
>> My question is that i do not know what represent ParameterMap and 
>> TransformOperationDataSet and how i initialize them and in an optimised way 
>> fr my model?
>> 
>> Do you have dôme exemple for flink ml un java please ?
>> 
>> 
>> Thank you in advance !
>> Best,
>> 
>> Anissa MOUSSAOUI 
>> 
>> 
>> 
>>        
>>    
>> 
>> 
>>  Pensez à la planète, imprimer ce papier que si nécessaire 
> 
> 
> 
>        
>    
> 
> 
>  Pensez à la planète, imprimer ce papier que si nécessaire



Re: Set partition number of Flink DataSet

2019-03-21 Thread qi luo
Thank you Fabian! I will check these issues.

> On Mar 20, 2019, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi,
> 
> I'm sorry but I'm only familiar with the high-level design but not with the 
> implementation details and concrete roadmap for the involved components.
> I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition 
> handling.
> 
> Best,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10288 
> 
> [2] https://issues.apache.org/jira/browse/FLINK-10429 
> 
> 
> 
> Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo  >:
> Hi Fabian,
> 
> I understand this is a by-design behavior, since Flink is firstly built for 
> streaming. Supporting batch shuffle and custom partition number in Flink may 
> be compelling in batch processing. 
> 
> Could you help explain a bit more on which works are needed to be done, so 
> Flink can support custom partition numbers numbers? We would be willing to 
> help improve this area.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 4:25 PM, Fabian Hueske > > wrote:
>> 
>> Hi,
>> 
>> Flink works a bit differently than Spark.
>> By default, Flink uses pipelined shuffles which push results of the sender 
>> immediately to the receivers (btw. this is one of the building blocks for 
>> stream processing).
>> However, pipelined shuffles require that all receivers are online. Hence, 
>> there number of partitions determines the number of running tasks.
>> There is also a batch shuffle mode, but it needs to be explicitly enabled 
>> and AFAIK does not resolve the dependency of number of partitions and task 
>> parallelism.
>> 
>> However, the community is currently working on many improvements for batch 
>> processing, including scheduling and fault-tolerance. 
>> Batched shuffles are an important building block for this and there might be 
>> better support for your use case in the future.
>> 
>> Best, Fabian
>> 
>> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo > >:
>> Hi Ken,
>> 
>> That looks awesome! I’ve implemented something similar to your bucketing 
>> sink, but using multiple internal writers rather than multiple internal 
>> output.
>> 
>> Besides this, I’m also curious whether Flink can achieve this like Spark: 
>> allow user to specify partition number in partitionBy() method (so no 
>> multiple output formats are needed). But this seems to need non-trivial 
>> changes in Flink core.
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 15, 2019, at 2:36 AM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> See https://github.com/ScaleUnlimited/flink-utils/ 
>>> , for a rough but working 
>>> version of a bucketing sink.
>>> 
>>> — Ken
>>> 
>>> 
 On Mar 13, 2019, at 7:46 PM, qi luo >>> > wrote:
 
 Hi Ken,
 
 Agree. I will try partitonBy() to reducer the number of parallel sinks, 
 and may also try sortPartition() so each sink could write files one by 
 one. Looking forward to your solution. :)
 
 Thanks,
 Qi
 
> On Mar 14, 2019, at 2:54 AM, Ken Krugler  > wrote:
> 
> Hi Qi,
> 
>> On Mar 13, 2019, at 1:26 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Do you mean that I can create a batch sink which writes to N files? 
> 
> Correct.
> 
>> That sounds viable, but since our data size is huge (billions of records 
>> & thousands of files), the performance may be unacceptable. 
> 
> The main issue with performance (actually memory usage) is how many 
> OutputFormats do you need to have open at the same time.
> 
> If you partition by the same key that’s used to define buckets, then the 
> max number is less, as each parallel instance of the sink only gets a 
> unique subset of all possible bucket values.
> 
> I’m actually dealing with something similar now, so I might have a 
> solution to share soon.
> 
> — Ken
> 
> 
>> I will check Blink and give it a try anyway.
>> 
>> Thank you,
>> Qi
>> 
>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> If I understand what you’re trying to do, then this sounds like a 
>>> variation of a bucketing sink.
>>> 
>>> That typically uses a field value to create a directory path or a file 
>>> name (though the filename case is only viable when the field is also 
>>> what’s used to partition the data)
>>> 
>>> But I don’t believe Flink has built-in support for that, in batch mode 
>>> (see BucketingSink 
>>> 

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

2019-03-21 Thread Bruno Aranda
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they
are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Releasing
slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched
from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 2, slots allocated: 0
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able
to find slots for the tasks in a reasonable time frame? Weirdly, we are
running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we
always try to keep an extra TM worth of free slots just in case. Looking at
the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are
available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed
settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin  wrote:

> Hi Bruno,
>
> could you also share the job master logs?
>
> Thanks,
> Andrey
>
> On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda  wrote:
>
>> Hi,
>>
>> This is causing serious instability and data loss in our production
>> environment. Any help figuring out what's going on here would be really
>> appreciated.
>>
>> We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2
>> (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt
>> like it was working sufficiently well in our pre-production environments
>> that we rolled it out to prod.
>>
>> However we're now seeing the jobmanager crash spontaneously several times
>> a day. There doesn't seem to be any pattern to when this happens - it
>> doesn't coincide with an increase in the data flowing through the system,
>> nor is it at the same time of day.
>>
>> The big problem is that when it recovers, sometimes a lot of the jobs
>> fail to resume with the following exception:
>>
>> org.apache.flink.util.FlinkException: JobManager responsible for
>> 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
>> //...
>> Caused by: java.util.concurrent.TimeoutException: The heartbeat of
>> JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
>> ... 16 more
>>
>> Starting them manually afterwards doesn't resume from checkpoint, which
>> for most jobs means it starts from the end of the source kafka topic. This
>> means whenever this surprise jobmanager restart happens, we have a ticking
>> clock during which we're losing data.
>>
>> We speculate that those jobs die first and while they wait to be
>> restarted (they have a 30 second delay strategy), the job manager restarts
>> and does not recover them? In any case, we have never seen so many job
>> failures and JM restarts with exactly the same EMR config.
>>
>> We've got some functionality we're building that uses the
>> StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal
>> option.
>>
>> Looking through the mailing list, we found
>> https://issues.apache.org/jira/browse/FLINK-11843 - does it seem
>> possible this might be related?
>>
>> Best regards,
>>
>> Bruno
>>
>


Re: [VOTE] Release 1.8.0, release candidate #3

2019-03-21 Thread Yu Li
Thanks @jincheng

@Aljoscha I've just opened FLINK-11990
 for the HDFS
BucketingSink issue with hadoop 2.8. IMHO it might be a blocker for 1.8.0
and need your confirmation. Thanks.

Best Regards,
Yu


On Thu, 21 Mar 2019 at 15:57, jincheng sun  wrote:

> Thanks for the quick fix, Yu. the PR of FLINK-11972
>  has been merged.
>
> Cheers,
> Jincheng
>
> Yu Li  于2019年3月21日周四 上午7:23写道:
>
>> -1, observed stably failure on streaming bucketing end-to-end test case
>> in two different environments (Linux/MacOS) when running with both shaded
>> hadoop-2.8.3 jar file
>> 
>> and hadoop-2.8.5 dist
>> , while both
>> env could pass with hadoop 2.6.5. More details please refer to this
>> comment
>> 
>> in FLINK-11972.
>>
>> Best Regards,
>> Yu
>>
>>
>> On Thu, 21 Mar 2019 at 04:25, jincheng sun 
>> wrote:
>>
>>> Thanks for the quick fix Aljoscha! The FLINK-11971
>>>  has been merged.
>>>
>>> Cheers,
>>> Jincheng
>>>
>>> Piotr Nowojski  于2019年3月21日周四 上午12:29写道:
>>>
 -1 from my side due to performance regression found in the master
 branch since Jan 29th.

 In 10% JVM forks it was causing huge performance drop in some of the
 benchmarks (up to 30-50% reduced throughput), which could mean that one out
 of 10 task managers could be affected by it. Today we have merged a fix for
 it [1]. First benchmark run was promising [2], but we have to wait until
 tomorrow to make sure that the problem was definitely resolved. If that’s
 the case, I would recommend including it in 1.8.0, because we really do not
 know how big of performance regression this issue can be in the real world
 scenarios.

 Regarding the second regression from mid February. We have found the
 responsible commit and this one is probably just a false positive. Because
 of the nature some of the benchmarks, they are running with low number of
 records (300k). The apparent performance regression was caused by higher
 initialisation time. When I temporarily increased the number of records to
 2M, the regression was gone. Together with Till and Stefan Richter we
 discussed the potential impact of this longer initialisation time (in the
 case of said benchmarks initialisation time increased from 70ms to 120ms)
 and we think that it’s not a critical issue, that doesn’t have to block the
 release. Nevertheless there might some follow up work for this.

 [1] https://github.com/apache/flink/pull/8020
 [2] http://codespeed.dak8s.net:8000/timeline/?ben=tumblingWindow&env=2

 Piotr Nowojski

 On 20 Mar 2019, at 10:09, Aljoscha Krettek  wrote:

 Thanks Jincheng! It would be very good to fix those but as you said, I
 would say they are not blockers.

 On 20. Mar 2019, at 09:47, Kurt Young  wrote:

 +1 (non-binding)

 Checked items:
 - checked checksums and GPG files
 - verified that the source archives do not contains any binaries
 - checked that all POM files point to the same version
 - build from source successfully

 Best,
 Kurt


 On Wed, Mar 20, 2019 at 2:12 PM jincheng sun 
 wrote:

> Hi Aljoscha&All,
>
> When I did the `end-to-end` test for RC3 under Mac OS, I found the
> following two problems:
>
> 1. The verification returned for different `minikube status` is is not
> enough for the robustness. The strings returned by different versions of
> different platforms are different. the following misjudgment is caused:
> When the `Command: start_kubernetes_if_not_ruunning failed` error
> occurs, the minikube has actually started successfully. The core reason is
> that there is a bug in the `test_kubernetes_embedded_job.sh` script. See
> FLINK-11971  for
> details.
>
> 2. Since the difference between 1.8.0 and 1.7.x is that 1.8.x does not
> put the `hadoop-shaded` JAR integrated into the dist.  It will cause an
> error when the end-to-end test cannot be found with `Hadoop` Related
> classes,  such as: `java.lang.NoClassDefFoundError:
> Lorg/apache/hadoop/fs/FileSystem`. So we need to improve the end-to-end
> test script, or explicitly stated in the README, i.e. end-to-end test need
> to add `flink-shaded-hadoop2-uber-.jar` to the classpath. See
> FLINK-11972 

Re: Async Function Not Generating Backpressure

2019-03-21 Thread Andrey Zagrebin
Hi Seed,

when you create `AsyncDataStream.(un)orderedWait` which capacity do you
pass in or you use the default one (100)?

Best,
Andrey

On Thu, Mar 21, 2019 at 2:49 AM Seed Zeng  wrote:

> Hey Andrey and Ken,
> Sorry about the late reply. I might not have been clear in my question
> The performance of writing to Cassandra is the same in both cases, only
> that the source rate was higher in the case of the async function is
> present.
> Something is "buffering" and not propagating backpressure to slow down the
> source speed from Kafka.
>
> In our use case, we prefer the backpressure to slow down the source so
> that the write to Cassandra is not delayed while the source is consuming
> fast.
>
> Thanks,
> Seed
>
> On Wed, Mar 20, 2019 at 9:38 AM Andrey Zagrebin 
> wrote:
>
>> Hi Seed,
>>
>> Sorry for confusion, I see now it is separate. Back pressure should still
>> be created because internal async queue has capacity
>> but not sure about reporting problem, Ken and Till probably have better
>> idea.
>>
>> As for consumption speed up, async operator creates another thread to
>> collect the result and Cassandra sink probably uses that thread to write
>> data.
>> This might parallelize and pipeline previous steps like Kafka fetching
>> and Cassandra IO but I am also not sure about this explanation.
>>
>> Best,
>> Andrey
>>
>>
>> On Tue, Mar 19, 2019 at 8:05 PM Ken Krugler 
>> wrote:
>>
>>> Hi Seed,
>>>
>>> I was assuming the Cassandra sink was separate from and after your async
>>> function.
>>>
>>> I was trying to come up for an explanation as to why adding the async
>>> function would improve your performance.
>>>
>>> The only very unlikely reason I thought of was that the async function
>>> somehow caused data arriving at the sink to be more “batchy”, which (if the
>>> Cassandra sink had an “every x seconds do a write” batch mode) could
>>> improve performance.
>>>
>>> — Ken
>>>
>>> On Mar 19, 2019, at 11:35 AM, Seed Zeng  wrote:
>>>
>>> Hi Ken and Andrey,
>>>
>>> Thanks for the response. I think there is a confusion that the writes to
>>> Cassandra are happening within the Async function.
>>> In my test, the async function is just a pass-through without doing any
>>> work.
>>>
>>> So any Cassandra related batching or buffering should not be the cause
>>> for this.
>>>
>>> Thanks,
>>>
>>> Seed
>>>
>>> On Tue, Mar 19, 2019 at 12:35 PM Ken Krugler <
>>> kkrugler_li...@transpac.com> wrote:
>>>
 Hi Seed,

 It’s a known issue that Flink doesn’t report back pressure properly for
 AsyncFunctions, due to how it monitors the output collector to gather back
 pressure statistics.

 But that wouldn’t explain how you get a faster processing with the
 AsyncFunction inserted into your workflow.

 I haven’t looked at how the Cassandra sink handles batching, but if the
 AsyncFunction somehow caused fewer, bigger Cassandra writes to happen then
 that’s one (serious hand waving) explanation.

 — Ken

 On Mar 18, 2019, at 7:48 PM, Seed Zeng  wrote:

 Flink Version - 1.6.1

 In our application, we consume from Kafka and sink to Cassandra in the
 end. We are trying to introduce a custom async function in front of the
 Sink to carry out some customized operations. In our testing, it appears
 that the Async function is not generating backpressure to slow down our
 Kafka Source when Cassandra becomes unhappy. Essentially compared to an
 almost identical job where the only difference is the lack of the Async
 function, Kafka source consumption speed is much higher under the same
 settings and identical Cassandra cluster. The experiment is like this.

 Job 1 - without async function in front of Cassandra
 Job 2 - with async function in front of Cassandra

 Job 1 is backpressured because Cassandra cannot handle all the writes
 and eventually slows down the source rate to 6.5k/s.
 Job 2 is slightly backpressured but was able to run at 14k/s.

 Is the AsyncFunction somehow not reporting the backpressure correctly?

 Thanks,
 Seed


 --
 Ken Krugler
 +1 530-210-6378
 http://www.scaleunlimited.com
 Custom big data solutions & training
 Flink, Solr, Hadoop, Cascading & Cassandra


>>> --
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>


Re: End-to-end exactly-once semantics in simple streaming app

2019-03-21 Thread Kostas Kloudas
Hi Patrick,

In order for you DB records to be up-to-date and correct, I think that you
would have to implement a 2-phase-commit sink.
Now for querying multiple keys, why not doing the following:

Let's assume for a single result record, you want to join data from K1, K2,
K3.
You can have a function that creates a composite key `K_comp =
createCompositeKey(K1, K2, K3)`.
Then you send 3 records out: (K1, K_comp), (K2, K_comp), (K3, K_comp).
You keyBy the first field initially, i,e. K1, K2, K3. This will send the
records to the nodes responsible for each key.
The nodes there will either have the data in state, or they can hit the
Oracle DB to fetch the related data.
So now, Flink will pick the relevant state.
And then you can keyBy the K_comp, which will send again all the records to
the same node, where they can be joined together.

Then you can use your 2-phase JDBC connector to push the result to your
Oracle DB when the checkpoint is acknowledged.
This solution uses Flink's state as a buffer.

What do you think about this solution?

Cheers,
Kostas



On Wed, Mar 20, 2019 at 9:38 AM Patrick Fial  wrote:

> Hi Andrey,
>
> thanks for your feedback. I am not sure if I understand 100% correctly,
> but using the flink state to store my stuff (in addition to the oracle
> database) is not an option, because to my knowledge flink state does not
> allow arbitrary lookup queries, which I need to do, however. Also, given
> the logic described in my original post, the database access is never going
> to be idempotent, which lies in the nature of the required insert/update
> logic.
>
> regards
> Patrick
>
> --
>
> *Patrick Fial*
>
> Client Platform Entwickler
>
> Information Design One AG
>
>
> Phone +49 69 244 502 38
>
> Web www.id1.de
>
>
>
> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main
>
> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
>
> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht
> (Vorsitz)
>
> Am 19. März 2019 um 17:59:22, Andrey Zagrebin (and...@ververica.com)
> schrieb:
>
> Hi Patrick,
>
> One approach, I would try, is to use Flink state and sync it with database
> in initializeState and CheckpointListener.notifyCheckpointComplete.
> Basically issue only idempotent updates to database but only when the last
> checkpoint is securely taken and records before it are not processed again.
> This has though a caveat that database might have stale data between
> checkpoints.
> Once the current state is synced with database, depending on your App, it
> might be even cleared from Flink state.
>
> I also cc Piotr and Kostas, maybe, they have more ideas.
>
> Best,
> Andrey
>
> On Tue, Mar 19, 2019 at 10:09 AM Patrick Fial  wrote:
>
>> Hello,
>>
>> I am working on a streaming application with apache flink, which shall
>> provide end-to-end exactly-once delivery guarantees. The application is
>> roughly built like this:
>>
>> environment.addSource(consumer)
>>   .map(… idempotent transformations ...)
>>   .map(new DatabaseFunction)
>>   .map(… idempotent transformations ...)
>>   .addSink(producer)
>>
>> Both source and sink are kafka connectors, and thus support exactly-once
>> delivery guarantees.
>>
>> The tricky part comes with the .map() containing the DatabaseFunction.
>> Its job is to:
>> 1) look up the incoming message in some oracle database
>> 2a) insert it if it is not already stored in the database and publish the
>> incoming message
>> 2b) otherwise combine the incoming update with previous contents from the
>> database, and store back the combined update in the database
>> 3) output the result of 2) to the next operator
>>
>> This logic leads to inconsistent data beeing published to the sink in
>> case of a failure where the DatabaseFunction was already executed, but the
>> message is not yet published to the sink.
>>
>> My understanding is, that in such a scenario all operator states would be
>> reverted to the last checkpoint. Since the .map() operator is stateless,
>> nothing is done here, so only the consumer and producer states are
>> reverted. This leads to the message beeing reprocessed from the beginning
>> (source), and thus beeing processed *again* by the DatabaseFunction.
>> However, the DatabaseFunction is not idempotent (because of 1)-3) as
>> explained above), and thus leads to a different output than in the first
>> run.
>>
>> The question is, how I can assure transaction-safety in this application?
>>
>> Basically, I would need to use database transactions within the
>> DatabaseFunction, and commit those only if the messages are also commited
>> to the kafka sink. However, I don’t know how to achieve this.
>>
>> I read about the two phase commit protocol in flink (
>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html),
>> but I fail to find examples of how to implement this in detail for stream
>> operators (NOT sinks). All documentation I find only refers to using the
>> two phase commit pro

local dev using kafka consumer by docker got wrong cluster node id

2019-03-21 Thread Andy Hoang
So I want to run flink in my local. Kafka docker and its zookeeper has been 
work great for local dev of other projects, I want to try this kafka with new 
flink project in local.
I have problem of first, the connect from my kafka consumer source is created 
but then it try to connect with a different node while the flink job running.
Example:
- First it connect to node 0.0.0.0:9092(id: -1 rack: null)
- Then It try to connect nodes = [4c34977feb35:9092 (id: 1001 rack: null)]

+ the value 4c34977feb35 is the docker container name of my kafka docker
+ the value "0.0.0.0:9092” is provided by flink job code

I already try to setup another kafka cluster with different name and the flink 
consumer is still somehow can find it which end up can not connect to it

My env:
- docker for mac
- flink 1.7.0
- scala 2.12

Log file is include here:
[info] 16:18:53.130 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=consumer-1, groupId=pp_flink_shipment_processor] Kafka consumer 
initialized
[info] 16:18:53.406 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating connection to node 0.0.0.0:9092 
(id: -1 rack: null)
[info] 16:18:53.421 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-sent
[info] 16:18:53.423 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-received
[info] 16:18:53.424 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
[info] 16:18:53.430 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Created socket with SO_RCVBUF = 342972, 
SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
[info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Completed connection to node -1. Fetching 
API versions.
[info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1.
[info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1.
[info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Using older server API v0 to send 
API_VERSIONS {} with correlation id 2 to node -1
[info] 16:18:53.574 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Recorded API versions for node -1: 
(Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 
0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 
[usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 
4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 
3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 
1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], 
SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0], 
CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, 
ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): 
UNSUPPORTED, DeleteGroups(42): UNSUPPORTED)
[info] 16:18:53.577 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Sending metadata request 
(type=MetadataRequest, topics=) to node 0.0.0.0:9092 (id: -1 rack: null)
[info] 16:18:53.591 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient 

Job crashed very soon

2019-03-21 Thread yinhua.dai
Hi Community,

I was trying to run a big batch job which use JDBCInputFormat to retrieve a
large amount data from a mysql database and do some joins in flink, the
environment is AWS EMR. But it always failed very fast.

I'm using flink on yarn, flink 1.6.1
my cluster has 1000GB memory, my job parameter is:
-yD akka.ask.timeout=60s -yD akka.framesize=300m -yn 50 -ys 2 -yjm 8192 -ytm
8192 -p 40

I have 6 data sources with different tables and most of them are set with
100 parallelism.
I can only see below WARN logs from the yarn aggregated yarn logs, the whole
log is too big.

2019-03-21 09:10:16,430 WARN  org.apache.flink.runtime.taskmanager.Task 
   
- Task 'CHAIN DataSource (at createInput(ExecutionEnvironment.java:548)
(org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) -> FlatMap (where:
(AND(=(VALUE, _UTF-16LE'true'), =(ATTRIBUTENAME,
_UTF-16LE'QuoteCurrencyId'))), select: (QUOTEID)) (10/50)' did not react to
cancelling signal for 30 seconds, but is stuck in method:
 java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.FilterInputStream.read(FilterInputStream.java:133)
com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108)
com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57)
com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61)
com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1685)
com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1698)
com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1752)
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1041)
com.mysql.cj.NativeSession.execSQL(NativeSession.java:1157)
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:947)
com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1020)
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:238)
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:748)

2019-03-21 09:10:16,883 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system
[akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:41133]
has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2019-03-21 09:10:18,447 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
   
- RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
Shutting down TaskExecutorLocalStateStoresManager.
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
2019-03-21 09:10:18,448 INFO 
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache


2019-03-21 09:08:36,913 WARN  akka.remote.transport.netty.NettyTransport
   
- Remote connection to [/10.97.33.195:39282] failed with
java.io.IOException: Connection reset by peer
2019-03-21 09:08:36,913 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system
[akka.tcp://fl...@ip-10-97-33-195.tr-fr-nonprod.aws-int.thomsonreuters.com:45423]
has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2019-03-21 09:08:37,020 WARN  org.apache.flink.yarn.YarnResourceManager 
   
- Discard registration from TaskExecutor
container_1553143971811_0015_01_59 at
(akka.tcp://fl...@ip-10-97-36-43.tr-fr-nonprod.aws-int.thomsonreuters.com:44685/user/taskmanager_0)
because the framework did not recognize it
2019-03-21 09:08:37,020 WARN  org.apache.flink.yarn.

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-21 Thread Robert Metzger
Okay, great.

Congxian Qiu, Daryl and I have a kick-off call later today at 2pm CET, 9pm
China time about the design of the ecosystem page (see:
https://github.com/rmetzger/flink-community-tools/issues/4)
Please let me know if others want to join as well, I can add them to the
invite.

On Wed, Mar 20, 2019 at 4:10 AM Becket Qin  wrote:

> I agree. We can start with english-only and see how it goes. The comments
> and descriptions can always be multi-lingual but that is up to the package
> owners.
>
> On Tue, Mar 19, 2019 at 6:07 PM Robert Metzger 
> wrote:
>
>> Thanks.
>>
>> Do we actually want this page to be multi-language?
>>
>> I propose to make the website english-only, but maybe consider allowing
>> comments in different languages.
>> If we would make it multi-language, then we might have problems with
>> people submitting packages in non-english languages.
>>
>>
>>
>> On Tue, Mar 19, 2019 at 2:42 AM Becket Qin  wrote:
>>
>>> Done. The writeup looks great!
>>>
>>> On Mon, Mar 18, 2019 at 9:09 PM Robert Metzger 
>>> wrote:
>>>
 Nice, really good news on the INFRA front!
 I think the hardware specs sound reasonable. And a periodic backup of
 the website's database to Infra's backup solution sounds reasonable too.

 Can you accept and review my proposal for the website?


 On Sat, Mar 16, 2019 at 3:47 PM Becket Qin 
 wrote:

> >
> > I have a very capable and motivated frontend developer who would be
> > willing to implement what I've mocked in my proposal.
>
>
> That is awesome!
>
> I created a Jira ticket[1] to Apache Infra and got the reply. It looks
> that
> Apache infra team could provide a decent VM. The last piece is how to
> ensure the data is persisted so we won't lose the project info / user
> feedbacks when the VM is down. If Apache infra does not provide a
> persistent storage for DB backup, we can always ask for multiple VMs
> and do
> the fault tolerance by ourselves. It seems we can almost say the
> hardware
> side is also ready.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> [1] https://issues.apache.org/jira/browse/INFRA-18010
>
> On Fri, Mar 15, 2019 at 5:39 PM Robert Metzger 
> wrote:
>
> > Thank you for reaching out to Infra and the ember client.
> > When I first saw the Ember repository, I thought it is the whole
> thing
> > (frontend and backend), but while testing it, I realized it is
> "only" the
> > frontend. I'm not sure if it makes sense to adjust the Ember observer
> > client, or just write a simple UI from scratch.
> > I have a very capable and motivated frontend developer who would be
> > willing to implement what I've mocked in my proposal.
> > In addition, I found somebody (Congxian Qiu) who seems to be eager
> to help
> > with this project for the backend:
> > https://github.com/rmetzger/flink-community-tools/issues/4
> >
> > For Infra: I made the same experience when asking for more GitHub
> > permissions for "flinkbot": They didn't respond on their mailing
> list, only
> > on Jira.
> >
> >
> >
> > On Thu, Mar 14, 2019 at 2:45 PM Becket Qin 
> wrote:
> >
> >> Thanks for writing up the specifications.
> >>
> >> Regarding the website source code, Austin found a website[1] whose
> >> frontend code[2] is available publicly. It lacks some support (e.g
> login),
> >> but it is still a good starting point. One thing is that I did not
> find a
> >> License statement for that source code. I'll reach out to the
> author to see
> >> if they have any concern over our usage.
> >>
> >> Apache Infra has not replied to my email regarding some details
> about the
> >> VM. I'll open an infra Jira ticket tomorrow if there is still no
> response.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> [1] https://emberobserver.com/
> >> [2] https://github.com/emberobserver/client
> >>
> >>
> >>
> >> On Thu, Mar 14, 2019 at 1:35 AM Robert Metzger  >
> >> wrote:
> >>
> >>> @Bowen: I agree. Confluent Hub looks nicer, but it is on their
> company
> >>> website. I guess the likelihood that they give out code from their
> company
> >>> website is fairly low.
> >>> @Nils: Beam's page is similar to our Ecosystem page, which we'll
> >>> reactivate as part of this PR:
> >>> https://github.com/apache/flink-web/pull/187
> >>>
> >>> Spark-packages.org did not respond to my request.
> >>> I will propose a short specification in Becket's initial document.
> >>>
> >>>
> >>> On Mon, Mar 11, 2019 at 11:38 AM Niels Basjes 
> wrote:
> >>>
>  Hi,
> 
>  The Beam project has something in this area that is simply a page
>  within their documentation website:
> >>>

Best practice to handle update messages in stream

2019-03-21 Thread 徐涛
Hi Experts,
Assuming there is a stream which content is like this:
Seq ID MONEY
1.100   100
2.100   200
3.101   300

The record of Seq#2 is updating record of Seq#1, changing the money 
from 100 to 200.
If I register the stream as table T, and want to sum all the money 
group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the 
result, which is incorrect.

I can write a UDAF, for example latest, to compute the latest value of 
all the ID, then the SQL is like this:
select sum(MONEY) from
(
select ID, latest(MONEY) from T group by ID
)
But I have to save each ID and its latest value in state, I am worried 
that the state goes too large. Now I use this method and set the state 
retention to several days before the state goes too large. I wonder if there 
are better ways to do this.

So what is the best practice in this scenario? Anyone have a 
suggestion? Thanks a lot.


Best
Henry


Facebook: Save Wilpattu One Srilanka's Most Loved Place

2019-03-21 Thread felipe . o . gutierrez
Olá,

Eu acabei de assinar o abaixo-assinado "Facebook: Save Wilpattu One
Srilanka's Most Loved Place" e queria saber se você pode ajudar assinando
também.

A nossa meta é conseguir 15.000 assinaturas e precisamos de mais apoio.
Você pode ler mais sobre este assunto e assinar o abaixo-assinado aqui:

http://chng.it/ZsgJMgc6rb

Obrigado!
Felipe


Ambiguous behavior of Flink on Job cancellation with checkpoint configured

2019-03-21 Thread Parth Sarathy
Hi All,
   We are using flink 1.7.2 and have enabled checkpoint with
RocksDB configured as state backend with retain checkpoints on job cancel.
In our scenario we are cancelling the job and while resubmitting the job, we
try to restore the job with latest checkpoint / savepoint available. We are
observing ambiguous behavior based on the way job is being cancelled, below
are the captured observations:

Observations :
1. When we cancel the job with a savepoint option, a savepoint is created as
expected but flink is deleting the latest checkpoint directory available for
the running job. Is this an expected behavior even when the configuration
asks to retain checkpoints on job cancellation?
2. When we cancel the job without the savepoint option, the same latest
checkpoint was retained by flink as opposed to before where it was deleted
as job was cancelled with the savepoint option.

   As we have configured flink to retain only a single
checkpoint at any point of time, could there be any issue wherein when we
cancel the job with a savepoint, the savepoint gets triggered but fails
midway. So now we would end up with an incomplete savepoint and no trace of
checkpoint for the job as it would have been erased.

Thanks
Parth Sarathy



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [VOTE] Release 1.8.0, release candidate #3

2019-03-21 Thread jincheng sun
Thanks for the quick fix, Yu. the PR of FLINK-11972
 has been merged.

Cheers,
Jincheng

Yu Li  于2019年3月21日周四 上午7:23写道:

> -1, observed stably failure on streaming bucketing end-to-end test case in
> two different environments (Linux/MacOS) when running with both shaded
> hadoop-2.8.3 jar file
> 
> and hadoop-2.8.5 dist
> , while both
> env could pass with hadoop 2.6.5. More details please refer to this
> comment
> 
> in FLINK-11972.
>
> Best Regards,
> Yu
>
>
> On Thu, 21 Mar 2019 at 04:25, jincheng sun 
> wrote:
>
>> Thanks for the quick fix Aljoscha! The FLINK-11971
>>  has been merged.
>>
>> Cheers,
>> Jincheng
>>
>> Piotr Nowojski  于2019年3月21日周四 上午12:29写道:
>>
>>> -1 from my side due to performance regression found in the master branch
>>> since Jan 29th.
>>>
>>> In 10% JVM forks it was causing huge performance drop in some of the
>>> benchmarks (up to 30-50% reduced throughput), which could mean that one out
>>> of 10 task managers could be affected by it. Today we have merged a fix for
>>> it [1]. First benchmark run was promising [2], but we have to wait until
>>> tomorrow to make sure that the problem was definitely resolved. If that’s
>>> the case, I would recommend including it in 1.8.0, because we really do not
>>> know how big of performance regression this issue can be in the real world
>>> scenarios.
>>>
>>> Regarding the second regression from mid February. We have found the
>>> responsible commit and this one is probably just a false positive. Because
>>> of the nature some of the benchmarks, they are running with low number of
>>> records (300k). The apparent performance regression was caused by higher
>>> initialisation time. When I temporarily increased the number of records to
>>> 2M, the regression was gone. Together with Till and Stefan Richter we
>>> discussed the potential impact of this longer initialisation time (in the
>>> case of said benchmarks initialisation time increased from 70ms to 120ms)
>>> and we think that it’s not a critical issue, that doesn’t have to block the
>>> release. Nevertheless there might some follow up work for this.
>>>
>>> [1] https://github.com/apache/flink/pull/8020
>>> [2] http://codespeed.dak8s.net:8000/timeline/?ben=tumblingWindow&env=2
>>>
>>> Piotr Nowojski
>>>
>>> On 20 Mar 2019, at 10:09, Aljoscha Krettek  wrote:
>>>
>>> Thanks Jincheng! It would be very good to fix those but as you said, I
>>> would say they are not blockers.
>>>
>>> On 20. Mar 2019, at 09:47, Kurt Young  wrote:
>>>
>>> +1 (non-binding)
>>>
>>> Checked items:
>>> - checked checksums and GPG files
>>> - verified that the source archives do not contains any binaries
>>> - checked that all POM files point to the same version
>>> - build from source successfully
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Mar 20, 2019 at 2:12 PM jincheng sun 
>>> wrote:
>>>
 Hi Aljoscha&All,

 When I did the `end-to-end` test for RC3 under Mac OS, I found the
 following two problems:

 1. The verification returned for different `minikube status` is is not
 enough for the robustness. The strings returned by different versions of
 different platforms are different. the following misjudgment is caused:
 When the `Command: start_kubernetes_if_not_ruunning failed` error
 occurs, the minikube has actually started successfully. The core reason is
 that there is a bug in the `test_kubernetes_embedded_job.sh` script. See
 FLINK-11971  for
 details.

 2. Since the difference between 1.8.0 and 1.7.x is that 1.8.x does not
 put the `hadoop-shaded` JAR integrated into the dist.  It will cause an
 error when the end-to-end test cannot be found with `Hadoop` Related
 classes,  such as: `java.lang.NoClassDefFoundError:
 Lorg/apache/hadoop/fs/FileSystem`. So we need to improve the end-to-end
 test script, or explicitly stated in the README, i.e. end-to-end test need
 to add `flink-shaded-hadoop2-uber-.jar` to the classpath. See
 FLINK-11972  for
 details.

 I think this is not a blocker for release-1.8.0, but I think it would
 be better to include those commits in release-1.8 If we still have
 performance related bugs should be fixed.

 What do you think?

 Best,
 Jincheng


 Aljoscha Krettek  于2019年3月19日周二 下午7:58写道:

> Hi All,
>
> The release process for Flink 1.8.0 is curre