does mapFunction need to implement CheckpointedFunction?

2019-07-04 Thread liu ze
Hi,

I want to update third-party system in the mapFunction ,does mapFunction need 
to implement CheckpointedFunction?

For example, in the mapFunction I want to update mysql, do I need to implement 
checkpointfunc,  manage the state myself


stream=env.addSource()

stream.map(

"insert update mysql"

"A checkpointState to be implemented here?"
)

stream.addsink(kafka)


good luck!

Re:Tracking message processing in my application

2019-07-04 Thread Haibo Sun
Hi,  Roey


> What do you think about that? 


I would have some concerns about throughput and latency, so I think that the 
operators should report state data asynchronously and in batches to minimize 
the impact of monitoring on the normal business processing. In addition, If the 
amount of business data is too large in a certain period of time, which leads 
to the operator-side state data backlog exceeding the set capacity, how to deal 
with the operator-side state data also needs to be considered, whether to 
discard or block the business data processing, or other ways?


Best,
Haibo 

At 2019-07-04 20:29:02, "Halfon, Roey"  wrote:


Hi,

We are looking for a monitoring solution for our dataflow – Track the progress 
of incoming messages while they are processed.
I'll clarify – we want to build some service which will show status for each 
incoming message. And in case of failures to give some detailed information.

I thought about the following:
First, every incoming message will be assigned with some id.
We can create a "Reporter" (A logger with some additional capabilities)  which 
each operator can communicate with, and update a status and more relevant 
information. These details can be stroed in kibana (ES) for example.
Then, we need to create another service which will query kibana and shows the 
results.

What do you think about that? Is there any built-in solution for that? (flink 
built in metrics are not relevant here because they don't help to track a 
single message)

How are you logging and tracking your processed messages?
Is there any documentation or some use cases that I can learn from?

Thanks,
Roey.

Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Hao Sun
Personally I prefer 3) to keep split/select and correct the behavior. I
feel side output is kind of overkill for such a primitive function, and I
prefer simple APIs like split/select.

Hao Sun


On Thu, Jul 4, 2019 at 11:20 AM Xingcan Cui  wrote:

> Hi folks,
>
> Two weeks ago, I started a thread [1] discussing whether we should discard
> the split/select methods (which have been marked as deprecation since v1.7)
> in DataStream API.
>
> The fact is, these methods will cause "unexpected" results when using
> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or
> multi-times on the same target (e.g., ds.split(a).select(b),
> ds.split(c).select(d)). The reason is that following the initial design,
> the new split/select logic will always override the existing one on the
> same target operator, rather than append to it. Some users may not be
> aware of that, but if you do, a current solution would be to use the more
> powerful side output feature [2].
>
> FLINK-11084  added
> some restrictions to the existing split/select logic and suggest to
> replace it with side output in the future. However, considering that the
> side output is currently only available in the process function layer and
> the split/select could have been widely used in many real-world
> applications, we'd like to start a vote andlisten to the community on how
> to deal with them.
>
> In the discussion thread [1], we proposed three solutions as follows. All
> of them are feasible but have different impacts on the public API.
>
> 1) Port the side output feature to DataStream API's flatMap and replace
> split/select with it.
>
> 2) Introduce a dedicated function in DataStream API (with the "correct"
> behavior but a different name) that can be used to replace the existing
> split/select.
>
> 3) Keep split/select but change the behavior/semantic to be "correct".
>
> Note that this is just a vote for gathering information, so feel free to
> participate and share your opinions.
>
> The voting time will end on *July 7th 17:00 EDT*.
>
> Thanks,
> Xingcan
>
> [1]
> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
> 
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
> 
>


[VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Xingcan Cui
Hi folks,

Two weeks ago, I started a thread [1] discussing whether we should discard
the split/select methods (which have been marked as deprecation since v1.7)
in DataStream API.

The fact is, these methods will cause "unexpected" results when using
consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or
multi-times on the same target (e.g., ds.split(a).select(b),
ds.split(c).select(d)). The reason is that following the initial design,
the new split/select logic will always override the existing one on the
same target operator, rather than append to it. Some users may not be aware
of that, but if you do, a current solution would be to use the more
powerful side output feature [2].

FLINK-11084  added some
restrictions to the existing split/select logic and suggest to replace it with
side output in the future. However, considering that the side output is
currently only available in the process function layer and the split/select
could have been widely used in many real-world applications, we'd like to start
a vote andlisten to the community on how to deal with them.

In the discussion thread [1], we proposed three solutions as follows. All
of them are feasible but have different impacts on the public API.

1) Port the side output feature to DataStream API's flatMap and replace
split/select with it.

2) Introduce a dedicated function in DataStream API (with the "correct"
behavior but a different name) that can be used to replace the existing
split/select.

3) Keep split/select but change the behavior/semantic to be "correct".

Note that this is just a vote for gathering information, so feel free to
participate and share your opinions.

The voting time will end on *July 7th 17:00 EDT*.

Thanks,
Xingcan

[1]
https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html


Re: Apache Flink - How to find the number of window instances in an application

2019-07-04 Thread Chesnay Schepler

This is unfortunately not possible.

On 04/07/2019 19:40, M Singh wrote:

Hi:

I wanted to find out if there is a metric to find out the the number 
of global or non-global window instances in a Flink application.


Thanks

Mans





Apache Flink - How to find the number of window instances in an application

2019-07-04 Thread M Singh
Hi:
I wanted to find out if there is a metric to find out the the number of global 
or non-global window instances in a Flink application.
Thanks
Mans


Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-04 Thread Bowen Li
Thanks Youssef. The context makes more sense to me now.

Just from your description, I doubt it might be because of upsert - the
sink's throughput in step 1 is high but may stuck in step 2. AFAIK, Hive
ACID/UPSERT is not really scalable, it's ok for rare, occasional usage but
cannot scale well to massivenes.

I'd suggest you doing a few tests:
1) find out how much percentage of your data is upsert, and google how much
percentage fits a Hive ACID/upsert use case
2) try change step 2 from upsert to just append and see if the back
pressure goes away
3) make sure if it's really the sink causing the backpressure (can easily
do from Flink UI), and debug your sink's (via logging, Java remote
debugging, etc) and see where the bottleneck is

I think you can find the root cause with above steps, please report back if
the inference is valid or not so we can help more users. In case you find
that Hive ACID is not the problem, please share some high level code of
your job, so we can take another look.

Bowen


On Thu, Jul 4, 2019 at 6:50 AM Youssef Achbany 
wrote:

> Thank you Li for your answer and sorry for the dev mistake :).
>
> *To be more clear:*
>
> We write multiple events, assigned via a Flink tumbling window, to Hive in
> one JDBC INSERT statement. We wrote a Hive sink function for that, using
> only JDBC. We do not use partitions yet, but the table is clustered into
> buckets stored as ORC.
>
> We run the Flink job with parallellism 1 because Hive does not support
> multiple INSERT statements in parallel.
>
> We observe that the first instance of the tumbling window easily insert
> 10ks records in Hive, but following windows only 100s, probably because
> backpressure kicks in then.
>
> In addition, we have answered your questions in our mail in yellow.
>
> Thank you
>
> Kind regards
>
>  -Original Message-
>
> From: Bowen Li [mailto:bowenl...@gmail.com]
>
> Sent: Wednesday, July 03, 2019 9:34 PM
>
> To: dev; youssef.achb...@euranova.eu
>
> Subject: Re: Source Kafka and Sink Hive managed tables via Flink Job
>
>  Hi Youssef,
>
>  You need to provide more background context:
>
> - Which Hive sink are you using? We are working on the official Hive sink
>
> for community and will be released in 1.9. So did you develop yours in
>
> house?
>
> JDBC
>
>  - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
> instances
>
> of the same operator, or do you have you have 3 windowing operations
>
> chained?
>
> No parrell instances, I was refering tumbling window
>
>  - What does your Hive table look like? E.g. is it partitioned or
>
> non-partitioned? If partitioned, how many partitions do you have? is it
>
> writing in static partition or dynamic partition mode? what format? how
>
> large?
>
>  No partitioning done because low volumes (<100K records)
>
> Format: ORC
>
> Batches of 20K records are processed in the first windows
>
>  - What does your sink do - is each parallelism writing to multiple
>
> partitions or a single partition/table? Is it only appending data or
>
> upserting?
>
>  Single partition table, in 2 steps: (1) writing to temporary table
> (append), (2) execute SQL to upsert historical table with temporary table
>
> On Wed, 3 Jul 2019 at 21:39, Bowen Li  wrote:
>
>> BTW,  I'm adding user@ mailing list since this is a user question and
>> should be asked there.
>>
>> dev@ mailing list is only for discussions of Flink development. Please
>> see https://flink.apache.org/community.html#mailing-lists
>>
>> On Wed, Jul 3, 2019 at 12:34 PM Bowen Li  wrote:
>>
>>> Hi Youssef,
>>>
>>> You need to provide more background context:
>>>
>>> - Which Hive sink are you using? We are working on the official Hive
>>> sink for community and will be released in 1.9. So did you develop yours in
>>> house?
>>> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
>>> instances of the same operator, or do you have you have 3 windowing
>>> operations chained?
>>> - What does your Hive table look like? E.g. is it partitioned or
>>> non-partitioned? If partitioned, how many partitions do you have? is it
>>> writing in static partition or dynamic partition mode? what format? how
>>> large?
>>> - What does your sink do - is each parallelism writing to multiple
>>> partitions or a single partition/table? Is it only appending data or
>>> upserting?
>>>
>>> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
>>> youssef.achb...@euranova.eu> wrote:
>>>
 Dear all,

 I'm working for a big project and one of the challenge is to read Kafka
 topics and copy them via Hive command into Hive managed tables in order
 to
 enable ACID HIVE properties.

 I try it but I have a issue with back pressure:
 - The first window read 20.000 events and wrote them in Hive tables
 - The second, third, ... send only 100 events because the write in Hive
 take more time than the read of a Kafka topic. But writing 100 events or
 50.000 events takes +/- the same time for Hive.

>

RE: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

2019-07-04 Thread Hailu, Andreas
Very well - thank you both.

// ah

From: Haibo Sun 
Sent: Wednesday, July 3, 2019 9:37 PM
To: Hailu, Andreas [Tech] 
Cc: Yitzchak Lieberman ; user@flink.apache.org
Subject: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

Hi, Andreas

I'm glad you have had a solution. If you're interested in option 2 I talked 
about, you can follow up on the progress of the issue 
(https://issues.apache.org/jira/browse/FLINK-12573)
 that Yitzchak said by watching it.

Best,
Haibo

At 2019-07-03 21:11:44, "Hailu, Andreas" 
mailto:andreas.ha...@gs.com>> wrote:

Hi Haibo, Yitzchak, thanks for getting back to me.

The pattern I chose to use which worked was to extend the HadoopOutputFormat 
class, override the open() method, and modify the "mapreduce.output.basename" 
configuration property to match my desired file naming structure.

// ah

From: Haibo Sun mailto:sunhaib...@163.com>>
Sent: Tuesday, July 2, 2019 5:57 AM
To: Yitzchak Lieberman 
mailto:yitzch...@sentinelone.com>>
Cc: Hailu, Andreas [Tech] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re:Re: File Naming Pattern from HadoopOutputFormat


Hi, Andreas

You are right. To meet this requirement, Flink should need to expose a 
interface to allow customizing the filename.

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman" 
mailto:yitzch...@sentinelone.com>> wrote:
regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined 
the directory for the files in case of partitioning the data, for example:
/day=20190101/part-1-1
there is an open issue for that: 
https://issues.apache.org/jira/browse/FLINK-12573

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun 
mailto:sunhaib...@163.com>> wrote:
Hi, Andreas

I think the following things may be what you want.

1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


  public static class CustomAvroOutputFormat extends AvroOutputFormat {

  public CustomAvroOutputFormat(Path filePath, 
Class type) {

   super(filePath, type);

  }



  public CustomAvroOutputFormat(Class type) {

   super(type);

  }



  @Override

  public void open(int taskNumber, int numTasks) 
throws IOException {

   
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);

   super.open(taskNumber, 
numTasks);

  }



  @Override

  protected String getDirectoryFileName(int 
taskNumber) {

   // returns a custom filename

   return null;

  }

  }

2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).

ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-04 Thread Youssef Achbany
Thank you Li for your answer and sorry for the dev mistake :).

*To be more clear:*

We write multiple events, assigned via a Flink tumbling window, to Hive in
one JDBC INSERT statement. We wrote a Hive sink function for that, using
only JDBC. We do not use partitions yet, but the table is clustered into
buckets stored as ORC.

We run the Flink job with parallellism 1 because Hive does not support
multiple INSERT statements in parallel.

We observe that the first instance of the tumbling window easily insert
10ks records in Hive, but following windows only 100s, probably because
backpressure kicks in then.

In addition, we have answered your questions in our mail in yellow.

Thank you

Kind regards

 -Original Message-

From: Bowen Li [mailto:bowenl...@gmail.com]

Sent: Wednesday, July 03, 2019 9:34 PM

To: dev; youssef.achb...@euranova.eu

Subject: Re: Source Kafka and Sink Hive managed tables via Flink Job

 Hi Youssef,

 You need to provide more background context:

- Which Hive sink are you using? We are working on the official Hive sink

for community and will be released in 1.9. So did you develop yours in

house?

JDBC

 - What do you mean by 1st, 2nd, 3rd window? You mean the parallel instances

of the same operator, or do you have you have 3 windowing operations

chained?

No parrell instances, I was refering tumbling window

 - What does your Hive table look like? E.g. is it partitioned or

non-partitioned? If partitioned, how many partitions do you have? is it

writing in static partition or dynamic partition mode? what format? how

large?

 No partitioning done because low volumes (<100K records)

Format: ORC

Batches of 20K records are processed in the first windows

 - What does your sink do - is each parallelism writing to multiple

partitions or a single partition/table? Is it only appending data or

upserting?

 Single partition table, in 2 steps: (1) writing to temporary table
(append), (2) execute SQL to upsert historical table with temporary table

On Wed, 3 Jul 2019 at 21:39, Bowen Li  wrote:

> BTW,  I'm adding user@ mailing list since this is a user question and
> should be asked there.
>
> dev@ mailing list is only for discussions of Flink development. Please
> see https://flink.apache.org/community.html#mailing-lists
>
> On Wed, Jul 3, 2019 at 12:34 PM Bowen Li  wrote:
>
>> Hi Youssef,
>>
>> You need to provide more background context:
>>
>> - Which Hive sink are you using? We are working on the official Hive sink
>> for community and will be released in 1.9. So did you develop yours in
>> house?
>> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
>> instances of the same operator, or do you have you have 3 windowing
>> operations chained?
>> - What does your Hive table look like? E.g. is it partitioned or
>> non-partitioned? If partitioned, how many partitions do you have? is it
>> writing in static partition or dynamic partition mode? what format? how
>> large?
>> - What does your sink do - is each parallelism writing to multiple
>> partitions or a single partition/table? Is it only appending data or
>> upserting?
>>
>> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
>> youssef.achb...@euranova.eu> wrote:
>>
>>> Dear all,
>>>
>>> I'm working for a big project and one of the challenge is to read Kafka
>>> topics and copy them via Hive command into Hive managed tables in order
>>> to
>>> enable ACID HIVE properties.
>>>
>>> I try it but I have a issue with back pressure:
>>> - The first window read 20.000 events and wrote them in Hive tables
>>> - The second, third, ... send only 100 events because the write in Hive
>>> take more time than the read of a Kafka topic. But writing 100 events or
>>> 50.000 events takes +/- the same time for Hive.
>>>
>>> Someone have already do this source and sink? Could you help on this?
>>> Or have you some tips?
>>> It seems that defining a size window on number of event instead time is
>>> not
>>> possible. Is it true?
>>>
>>> Thank you for your help
>>>
>>> Youssef
>>>
>>> --
>>> ♻ Be green, keep it on the screen
>>>
>>

-- 
♻ Be green, keep it on the screen


Re: UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2019-07-04 Thread Chesnay Schepler

Flink only supports Java 8.

On 04/07/2019 15:34, Rauch, Jochen wrote:


Hi all,

I have implemented following code snippet with Apache Flink 1.8:

flinkConfiguration.getEnvironment().readTextFile(outputFile.getAbsolutePath(), 
"ISO-8859-1")


.flatMap(new FlatMapFunctionObject>, Integer>>() {


    ….

}) ….

It works fine with Java 8, but using Java 11 I get this error stacktrace:

    at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental(ClassVisitor.java:158)


    at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:541)


    at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:391)


    at 
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)


    at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)


    at 
org.apache.flink.api.java.DataSet.clean(DataSet.java:186)


    at 
org.apache.flink.api.java.DataSet.flatMap(DataSet.java:267)


    at 
de.idsem.xploit.ief.control.FlinkETLOutputPipeline.execute(FlinkETLOutputPipeline.java:66)


Could you please help with this issue?

Many thanks in advance and best regards

Jochen

-

Dipl.-Inform. (FH) Jochen Rauch

Health Information Systems

Fraunhofer Institute for Biomedical Engineering

Joseph-von-Fraunhofer-Weg 1

66280 Sulzbach

Germany

Visit: Ensheimer Str. 48, 66386 St. Ingbert, Germany

Phone: +49 (0)6897/9071-417

email: jochen.ra...@ibmt.fraunhofer.de 



http://www.ibmt.fraunhofer.de





UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2019-07-04 Thread Rauch, Jochen
Hi all,

I have implemented following code snippet with Apache Flink 1.8:
flinkConfiguration.getEnvironment().readTextFile(outputFile.getAbsolutePath(), 
"ISO-8859-1")
.flatMap(new FlatMapFunction, 
Integer>>() {

}) 

It works fine with Java 8, but using Java 11 I get this error stacktrace:
at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental(ClassVisitor.java:158)
at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:541)
at 
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:391)
at 
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:267)
at 
de.idsem.xploit.ief.control.FlinkETLOutputPipeline.execute(FlinkETLOutputPipeline.java:66)

Could you please help with this issue?

Many thanks in advance and best regards
Jochen


-
Dipl.-Inform. (FH) Jochen Rauch
Health Information Systems

Fraunhofer Institute for Biomedical Engineering
Joseph-von-Fraunhofer-Weg 1
66280 Sulzbach
Germany

Visit: Ensheimer Str. 48, 66386 St. Ingbert, Germany
Phone: +49 (0)6897/9071-417
email: jochen.ra...@ibmt.fraunhofer.de
http://www.ibmt.fraunhofer.de



Tracking message processing in my application

2019-07-04 Thread Halfon, Roey
Hi,
We are looking for a monitoring solution for our dataflow - Track the progress 
of incoming messages while they are processed.
I'll clarify - we want to build some service which will show status for each 
incoming message. And in case of failures to give some detailed information.

I thought about the following:
First, every incoming message will be assigned with some id.
We can create a "Reporter" (A logger with some additional capabilities)  which 
each operator can communicate with, and update a status and more relevant 
information. These details can be stroed in kibana (ES) for example.
Then, we need to create another service which will query kibana and shows the 
results.

What do you think about that? Is there any built-in solution for that? (flink 
built in metrics are not relevant here because they don't help to track a 
single message)

How are you logging and tracking your processed messages?
Is there any documentation or some use cases that I can learn from?

Thanks,
Roey.


Re: Can Flink infers the table columns type

2019-07-04 Thread Dawid Wysakowicz
Hi,

Unfortunately the automatic schema inference of jdbc source is not
supported yet. There is also no jdbc TableSource yet, but you should be
able to write one yourself that reuses the JDBCInputFormat. You may take
a look at BatchTableSource/StreamTableSource interfaces and
corresponding methods to create DataSet/DataStream:
StreamExecutionEnvironment.createInput/ExecutionEnvironment.createInput.

You may expect more (table) connectors in the Flink versions 1.10+. They
should have the automatic schema inference when possible.

Best,

Dawid

On 02/07/2019 20:36, Soheil Pourbafrani wrote:
> Hi 
>
> I want load MySQL tables in Flink without need to specifying column
> names and types (like what we can do in Apache Spark DataFrames). 
>
> Using the JDBCInputFormat we should pass the table fields type in the
> method setRowTypeInfo. I couldn't find any way to force Flink to infer
> the column type.
>
> In addition, I was wondering if it's possible to do the same
> using TableSource?
> thanks


signature.asc
Description: OpenPGP digital signature


Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
*@Fabian do you register any types / serializers via
ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?*

Nope, not at all. our flink job code has nowhere the word "Kryo" at all.

thx for looking into it ...

--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de

Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai <
tzuli...@apache.org>:

> I quickly checked the implementation of duplicate() for both the
> KryoSerializer and StreamElementSerializer (which are the only serializers
> involved here).
> They seem to be correct; especially for the KryoSerializer, since
> FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
> duplicating it, and therefore Kryo instances should not be shared at all
> across duplicates.
> This seems to rule out any duplication issues with the serializers.
>
> As a maybe relevant question, @Fabian do you register any types /
> serializers via ExecutionConfig.registerKryoType(...) /
> ExecutionConfig.registerTypeWithKryoSerializer(...)?
>
> Best,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-8836
>
> On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert  wrote:
>
>> No, not yet. We lack some knowledge in understanding this. The only thing
>> we found out that it happens most probably in the Elasticsearch Sink,
>> because:
>> - some error messages have the sink in their stack trace.
>> - when bumping the ES nodes specs on AWS, the error happens less often
>> (we haven't bumped it to super large instances yet, nor got to a state
>> where they go away completely. also this would not be the ideal fix)
>>
>> so my current assumption is that some backpressuring is not happening
>> correctly. but this is super vaguely, any other hints or support on this is
>> highly appreciated.
>>
>> --
>>
>>
>> *Fabian WollertZalando SE*
>>
>> E-Mail: fab...@zalando.de
>>
>>
>> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Any news on this? Have you found the cause of the error?
>>>
>>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Indeed looking at StreamElementSerializer the duplicate() method could
 be bugged:

 @Override
 public StreamElementSerializer duplicate() {
   TypeSerializer copy = typeSerializer.duplicate();
   return (copy == typeSerializer) ? this : new
 StreamElementSerializer(copy);
 }

 Is ti safe to return this when copy == typeSerializer ...?

 On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi Fabian,
> we had similar errors with Flink 1.3 [1][2] and the error was caused
> by the fact that a serialised was sharing the same object with multiple
> threads.
> The error was not deterministic and was changing from time to time.
> So maybe it could be something similar (IMHO).
>
> [1] http://codeha.us/apache-flink-users/msg02010.html
> [2]
> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>
> Best,
> Flavio
>
> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
> wrote:
>
>> additionally we have these coming with this as well all the time:
>>
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>  at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>  at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>  at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>>>

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Tzu-Li (Gordon) Tai
I quickly checked the implementation of duplicate() for both the
KryoSerializer and StreamElementSerializer (which are the only serializers
involved here).
They seem to be correct; especially for the KryoSerializer, since
FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
duplicating it, and therefore Kryo instances should not be shared at all
across duplicates.
This seems to rule out any duplication issues with the serializers.

As a maybe relevant question, @Fabian do you register any types /
serializers via ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?

Best,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert  wrote:

> No, not yet. We lack some knowledge in understanding this. The only thing
> we found out that it happens most probably in the Elasticsearch Sink,
> because:
> - some error messages have the sink in their stack trace.
> - when bumping the ES nodes specs on AWS, the error happens less often (we
> haven't bumped it to super large instances yet, nor got to a state where
> they go away completely. also this would not be the ideal fix)
>
> so my current assumption is that some backpressuring is not happening
> correctly. but this is super vaguely, any other hints or support on this is
> highly appreciated.
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fab...@zalando.de
>
>
> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Any news on this? Have you found the cause of the error?
>>
>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier 
>> wrote:
>>
>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>> be bugged:
>>>
>>> @Override
>>> public StreamElementSerializer duplicate() {
>>>   TypeSerializer copy = typeSerializer.duplicate();
>>>   return (copy == typeSerializer) ? this : new
>>> StreamElementSerializer(copy);
>>> }
>>>
>>> Is ti safe to return this when copy == typeSerializer ...?
>>>
>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
>>> wrote:
>>>
 Hi Fabian,
 we had similar errors with Flink 1.3 [1][2] and the error was caused by
 the fact that a serialised was sharing the same object with multiple
 threads.
 The error was not deterministic and was changing from time to time.
 So maybe it could be something similar (IMHO).

 [1] http://codeha.us/apache-flink-users/msg02010.html
 [2]
 http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e

 Best,
 Flavio

 On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
 wrote:

> additionally we have these coming with this as well all the time:
>
> com.esotericsoftware.kryo.KryoException: 
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> or
>
>
> com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
No, not yet. We lack some knowledge in understanding this. The only thing
we found out that it happens most probably in the Elasticsearch Sink,
because:
- some error messages have the sink in their stack trace.
- when bumping the ES nodes specs on AWS, the error happens less often (we
haven't bumped it to super large instances yet, nor got to a state where
they go away completely. also this would not be the ideal fix)

so my current assumption is that some backpressuring is not happening
correctly. but this is super vaguely, any other hints or support on this is
highly appreciated.

--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de


Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Any news on this? Have you found the cause of the error?
>
> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier 
> wrote:
>
>> Indeed looking at StreamElementSerializer the duplicate() method could be
>> bugged:
>>
>> @Override
>> public StreamElementSerializer duplicate() {
>>   TypeSerializer copy = typeSerializer.duplicate();
>>   return (copy == typeSerializer) ? this : new
>> StreamElementSerializer(copy);
>> }
>>
>> Is ti safe to return this when copy == typeSerializer ...?
>>
>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Fabian,
>>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>>> the fact that a serialised was sharing the same object with multiple
>>> threads.
>>> The error was not deterministic and was changing from time to time.
>>> So maybe it could be something similar (IMHO).
>>>
>>> [1] http://codeha.us/apache-flink-users/msg02010.html
>>> [2]
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
>>> wrote:
>>>
 additionally we have these coming with this as well all the time:

 com.esotericsoftware.kryo.KryoException: 
 java.lang.ArrayIndexOutOfBoundsException
 Serialization trace:
 _children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.ArrayIndexOutOfBoundsException


 or


 com.esotericsoftware.kryo.KryoException: 
 java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
 Serialization trace:
 _children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
 org.apache.fl

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Flavio Pompermaier
Any news on this? Have you found the cause of the error?

On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier 
wrote:

> Indeed looking at StreamElementSerializer the duplicate() method could be
> bugged:
>
> @Override
> public StreamElementSerializer duplicate() {
>   TypeSerializer copy = typeSerializer.duplicate();
>   return (copy == typeSerializer) ? this : new
> StreamElementSerializer(copy);
> }
>
> Is ti safe to return this when copy == typeSerializer ...?
>
> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
> wrote:
>
>> Hi Fabian,
>> we had similar errors with Flink 1.3 [1][2] and the error was caused by
>> the fact that a serialised was sharing the same object with multiple
>> threads.
>> The error was not deterministic and was changing from time to time.
>> So maybe it could be something similar (IMHO).
>>
>> [1] http://codeha.us/apache-flink-users/msg02010.html
>> [2]
>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>>
>> Best,
>> Flavio
>>
>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert  wrote:
>>
>>> additionally we have these coming with this as well all the time:
>>>
>>> com.esotericsoftware.kryo.KryoException: 
>>> java.lang.ArrayIndexOutOfBoundsException
>>> Serialization trace:
>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>> at 
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>> at 
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>> at 
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>> at 
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> at 
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>
>>>
>>> or
>>>
>>>
>>> com.esotericsoftware.kryo.KryoException: 
>>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>> Serialization trace:
>>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>> at 
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>> at 
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>> at 
>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>> at 
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>> at 
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>> at java.util.ArrayList.get(ArrayList.java:433)
>>> at 
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>> at 
>>> com.esotericsoftware.kryo.serializers.MapSerializer

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Tzu-Li (Gordon) Tai
Hi Andrea,

Is there a specific reason you want to use a custom TypeInformation /
TypeSerializer for your type?
>From the description in the original post, this part wasn't clear to me.

If the only reason is because it is generally suggested to avoid generic
type serialization via Kryo, both for performance reasons as well as
evolvability in the future, then updating your type to be recognized by
Flink as one of the supported types [1] would be enough.
Otherwise, implementing your own type information and serializer is usually
only something users with very specific use cases might be required to do.
Since you are also using that type as managed state, for a safer schema
evolvability story in the future, I would recommend either Avro or Pojo as
Jingsong Lee had already mentioned.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class

On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina 
wrote:

> Hi JingsongLee, thank you for your answer.
> I wanted to explore it as the last chance honestly. Anyway if defining
> custom serializers and types information involves quite a big effort, I
> would reconsider my guess.
>
> Cheers,
>
> Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <
> lzljs3620...@aliyun.com> ha scritto:
>
>> Hi Andrea:
>> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
>> will use PojoTypeInfo and PojoSerializer that have a good
>> implementation already.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>>
>> Best, JingsongLee
>>
>> --
>> From:Andrea Spina 
>> Send Time:2019年7月4日(星期四) 14:37
>> To:user 
>> Subject:Providing Custom Serializer for Generic Type
>>
>> Dear community,
>> in my job, I run with a custom event type *MyClass* which is a sort of
>> "generic event" that I handle all along my streaming flow both as an event
>> (DataStream[MyClass]) and as a managed state.
>>
>> I see that Flink warns me about generic serialization of
>> *MyClass*
>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>> io.radicalbit.MyClass does not contain a setter for field
>> io$radicalbit$MyClass$$schema
>>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
>> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
>> are valid POJO fields, and must be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance.
>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>> io.radicalbit.MyClass does not contain a setter for field
>> io$radicalbit$MyClass$schema
>>
>> So that I wanted to provide my custom serializer for MyClass, trying
>> first to register the Java one to check if the system recognizes it so I
>> followed [1] but it seems that it is not considered.
>>
>> I read then about [2] (the case is way akin to mine) and AFAIU I need to
>> implement a custom TypeInformation and TypeSerializer for my class as
>> suggested in [3] because Flink will ignore my registered serializer as long
>> as it considers my type as *generic*.
>>
>> config.registerTypeWithKryoSerializer(classOf[MyClass], 
>> classOf[RadicalSerde])
>>
>>
>> My question finally is: Do I need to provide this custom classes? Is
>> there any practical example for creating custom information like the above
>> mentioned? I have had a quick preliminary look at it but seems that I need
>> to provide a non-trivial amount of information to TypeInformation and
>> TypeSerializer interfaces.
>>
>> Thank you for your excellent work and help.
>>
>> Cheers.
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>> [2] -
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
>> [3] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
>> --
>> Andrea Spina
>> Head of R&D @ Radicalbit Srl
>> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>>
>>
>>
>
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Andrea Spina
Hi JingsongLee, thank you for your answer.
I wanted to explore it as the last chance honestly. Anyway if defining
custom serializers and types information involves quite a big effort, I
would reconsider my guess.

Cheers,

Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee 
ha scritto:

> Hi Andrea:
> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
> will use PojoTypeInfo and PojoSerializer that have a good
> implementation already.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>
> Best, JingsongLee
>
> --
> From:Andrea Spina 
> Send Time:2019年7月4日(星期四) 14:37
> To:user 
> Subject:Providing Custom Serializer for Generic Type
>
> Dear community,
> in my job, I run with a custom event type *MyClass* which is a sort of
> "generic event" that I handle all along my streaming flow both as an event
> (DataStream[MyClass]) and as a managed state.
>
> I see that Flink warns me about generic serialization of
> *MyClass*
>  INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
> does not contain a setter for field io$radicalbit$MyClass$$schema
>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
> are valid POJO fields, and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance.
>  INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass
> does not contain a setter for field io$radicalbit$MyClass$schema
>
> So that I wanted to provide my custom serializer for MyClass, trying first
> to register the Java one to check if the system recognizes it so I followed
> [1] but it seems that it is not considered.
>
> I read then about [2] (the case is way akin to mine) and AFAIU I need to
> implement a custom TypeInformation and TypeSerializer for my class as
> suggested in [3] because Flink will ignore my registered serializer as long
> as it considers my type as *generic*.
>
> config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])
>
>
> My question finally is: Do I need to provide this custom classes? Is there
> any practical example for creating custom information like the above
> mentioned? I have had a quick preliminary look at it but seems that I need
> to provide a non-trivial amount of information to TypeInformation and
> TypeSerializer interfaces.
>
> Thank you for your excellent work and help.
>
> Cheers.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> [2] -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
> [3] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
> --
> Andrea Spina
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>

-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

2019-07-04 Thread Anyang Hu
Thanks for your replies.

To Peter:
The heartbeat.timeout has been increased to 3 minutes before, but the job
manager timeout will still occur. At present, the following logic is added
: When JM times out, onFatalError is called, which can ensure that the job
fails to exit quickly. Does the method have side effects?


@Override
public void notifyAllocationFailure(JobID jobId, AllocationID
allocationId, Exception cause) {
   validateRunsInMainThread();
   log.info("Slot request with allocation id {} for job {} failed.",
allocationId, jobId, cause);

   JobManagerRegistration jobManagerRegistration =
jobManagerRegistrations.get(jobId);
   if (jobManagerRegistration != null) {
  
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
cause);
   } else {
  ResourceManagerException exception = new
ResourceManagerException("Job Manager is lost, can not notify
allocation failure.");
  onFatalError(exception);
   }
}



Yours,
Anyang


Peter Huang  于2019年7月2日周二 上午2:43写道:

> Hi Anyang,
>
> Thanks for rising the question. I didn't test the PR in batch mode, the
> observation helps me to have better implementation. From my understanding,
> if rm to a job manager heartbeat timeout, the job manager connection will
> be closed, so it will not be reconnected. Are you running batch job in per
> job cluster or session cluster? To temporarily mitigate the issue you are
> facing, you probable can tune the heartbeat.timecout (default 50s) to a
> larger value.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann  wrote:
>
>> Hi Anyang,
>>
>> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
>> Thus, I cannot tell much about how well it works. The case you are
>> describing should be properly handled in a version which get's merged
>> though. I guess what needs to happen is that once the JM reconnects to the
>> RM it should synchronize the pending slot requests with the registered slot
>> requests on the RM. But this should be a follow up issue to FLINK-10868,
>> because it would widen the scope too much.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu 
>> wrote:
>>
>>> Hi ZhenQiu && Rohrmann:
>>>
>>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>>> batch jobs) can be exited immediately after applying for the failed
>>> container to the upper limit, but there are still some jobs cannot be
>>> exited immediately. Through the log, it is observed that these jobs have
>>> the job manager timed out first for unknown reasons. The execution of code
>>> segment 1 is after the job manager timed out but before the job manager is
>>> reconnected, so it is suspected that the job manager is out of
>>> synchronization and notifyAllocationFailure() method in the code segment 2
>>> is not executed.
>>>
>>>
>>> I'm wandering if you have encountered similar problems and is there a
>>> solution? In order to solve the problem that cannot be immediately quit, it
>>> is currently considered that if (jobManagerRegistration==null) then
>>> executes the onFatalError() method to immediately exit the process, it is
>>> temporarily unclear whether this violent practice will have any side
>>> effects.
>>>
>>>
>>> Thanks,
>>> Anyang
>>>
>>>
>>> code segment 1 in ResourceManager.java:
>>>
>>> private void cancelAllPendingSlotRequests(Exception cause) {
>>>slotManager.cancelAllPendingSlotRequests(cause);
>>> }
>>>
>>>
>>> code segment 2 in ResourceManager.java:
>>>
>>> @Override
>>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
>>> Exception cause) {
>>>validateRunsInMainThread();
>>>log.info("Slot request with allocation id {} for job {} failed.", 
>>> allocationId, jobId, cause);
>>>
>>>JobManagerRegistration jobManagerRegistration = 
>>> jobManagerRegistrations.get(jobId);
>>>if (jobManagerRegistration != null) {
>>>   
>>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>>>  cause);
>>>}
>>> }
>>>
>>>