Re: Providing external files to flink classpath

2019-06-28 Thread Yun Tang
Hi Vishwas


  1.  You could use '-yt' to ship specified files to the class path, please 
refer to [1] for more details.
  2.  If the properties are only loaded on client side before executing the 
application, you could let your application to just read from local property 
data. Flink support to load properties within the ParameterTool [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage
[2] 
https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120

Best
Yun Tang


From: Vishwas Siravara 
Sent: Saturday, June 29, 2019 0:43
To: user
Subject: Providing external files to flink classpath

Hi ,
I am trying to add external property files to the flink classpath for
my application. These files are not a part of the fat jar. I put them
under the lib folder but flink cant find them? How can I manage
external property files that needs to be read by flink ?

Thanks,
Vishwas


Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:

This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:

Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio





Providing external files to flink classpath

2019-06-28 Thread Vishwas Siravara
Hi ,
I am trying to add external property files to the flink classpath for
my application. These files are not a part of the fat jar. I put them
under the lib folder but flink cant find them? How can I manage
external property files that needs to be read by flink ?

Thanks,
Vishwas


Re: LookupableTableSource question

2019-06-28 Thread Flavio Pompermaier
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier 
wrote:

> This could be a good fit, I'll try to dig into it and see if it can be
> adapted to a REST service.
> The only strange thing I see is that the key of the local cache is per
> block of keys..am I wrong?
> Shouldn't it cycle over the list of passed keys?
>
> Right now it's the following:
>
> Cache> cache;
>
> public void eval(Object... keys) {
> Row keyRow = Row.of(keys);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
>  ...
>
> while I'd use the following (also for JDBC):
>
> Cache> cache;
>
> public void eval(Object... keys) {
> Row keyRow = Row.of(keys);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
>  ...
>
> public void eval(Object... keys) {
> for (Object kkk : keys) {
> Row keyRow = Row.of(kkk);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
> }
>  ...
>
> Am I missing something?
>
>
> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee 
> wrote:
>
>> Hi Flavio:
>>
>> I just implement a JDBCLookupFunction[1]. You can use it as table
>> function[2]. Or use
>> blink temporal table join[3] (Need blink planner support).
>> I add a google guava cache in JDBCLookupFunction with configurable
>> cacheMaxSize
>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>> Is that you want?
>>
>> [1]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>> [2]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>> [3]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>
>>  Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年6月28日(星期五) 21:04
>> To:user 
>> Subject:LookupableTableSource question
>>
>> Hi to all,
>> I have a use case where I'd like to enrich a stream using a rarely
>> updated lookup table.
>> Basically, I'd like to be able to set a refresh policy that is triggered
>> either when a key was not found (a new key has probably been added in the
>> mean time) or a configurable refresh-period has elapsed.
>>
>> Is there any suggested solution to this? The LookupableTableSource looks
>> very similar to what I'd like to achieve but I can't find a real-world
>> example using it and it lacks of such 2 requirements (key-values are not
>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>> handled).
>>
>> Any help is appreciated,
>> Flavio
>>
>>
>>
>


Re: LookupableTableSource question

2019-06-28 Thread Flavio Pompermaier
This could be a good fit, I'll try to dig into it and see if it can be
adapted to a REST service.
The only strange thing I see is that the key of the local cache is per
block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:

> Hi Flavio:
>
> I just implement a JDBCLookupFunction[1]. You can use it as table
> function[2]. Or use
> blink temporal table join[3] (Need blink planner support).
> I add a google guava cache in JDBCLookupFunction with configurable
> cacheMaxSize
> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
> Is that you want?
>
> [1]
> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
> [2]
> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
> [3]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>
>  Best, JingsongLee
>
> --
> From:Flavio Pompermaier 
> Send Time:2019年6月28日(星期五) 21:04
> To:user 
> Subject:LookupableTableSource question
>
> Hi to all,
> I have a use case where I'd like to enrich a stream using a rarely updated
> lookup table.
> Basically, I'd like to be able to set a refresh policy that is triggered
> either when a key was not found (a new key has probably been added in the
> mean time) or a configurable refresh-period has elapsed.
>
> Is there any suggested solution to this? The LookupableTableSource looks
> very similar to what I'd like to achieve but I can't find a real-world
> example using it and it lacks of such 2 requirements (key-values are not
> refreshed after a configurable timeout and a KeyNotFound callback cannot be
> handled).
>
> Any help is appreciated,
> Flavio
>
>
>


Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-28 Thread Jeff Zhang
This is due to flink doesn't unify the execution in different enviroments.
The community has discuss it before about how to enhance the flink client
api. The initial proposal is to introduce FlinkConf which contains all the
configuration so that we can unify the executions in all environments (IDE,
CLI, SQL Client, Scala Shell, downstream project)

Here's the sample code:

val conf = new FlinkConf().setProperty(“key_1”, “value_1”) // create
FlinkConf

val env = new ExecutionEnvironment(conf)   // create ExecutionEnvironment

val jobId = env.submit(...)   // non-blocking job submission
(detached mode)

val jobStatus = env.getClusterClient().queryJobStatus(jobId)   // approach
1: query job status via ClusterClient

val jobStatus = env.queryJobStatus(jobId)   // approach 2: query job status
via ExecutionEnvironment.


And you can refer this for more details:

https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing




Xintong Song  于2019年6月28日周五 下午10:28写道:

> Hi, Singh,
>
> I don't think that should work. The -D or -yD parameters needs to be
> passed to the Flink start-up scripts or the "flink run" command. I don't
> think the IntelliJ VM arguments are equivalent to that. In fact, I'm not
> aware of any method to set "-D" parameters when running jobs IDE.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:
>
>> Hi Xintong:
>>
>> I passed the -Dparallelism.default=2 in the  run configuration VM
>> arguments for IntelliJ.
>>
>> So what I am looking for is a way to overwrite the config parameters
>> which are defined in the flink-config.yaml file (parallelism.default is
>> just an example) which would be picked up regardless of the env (eg:
>> locally, on yarn or IDE).  When I run the application in IDE (locally) with
>> the above mentioned VM parameter, the StreamExecutionEnvironment.config
>> does not show this value and the Flink UI shows configuration parameter
>> parallelism as 8.  Is there any other place where I can see the parameter
>> settings ?
>>
>> Thanks.
>>
>> On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Could you provide some more details on how you run your job with -D
>> options in IDE?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:
>>
>> Hi Xintong:  Thanks for your pointers.
>>
>> I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do
>> you know if there is a common way that would work both for emr, locally and
>> ide ?
>>
>> Thanks again.
>>
>> On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> You can use the environment variable "FLINK_CONF_DIR" to specify path to
>> the directory of config files. You can also override config options with
>> command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink
>> run' command).
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:
>>
>> Hi:
>>
>> I have a single EMR cluster with Flink and want to run multiple
>> applications on it with different flink configurations.  Is there a way to
>>
>> 1. Pass the config file name for each application, or
>> 2. Overwrite the config parameters via command line arguments for the
>> application.  This is similar to how we can overwrite the default
>> parameters in spark
>>
>> I searched the documents and have tried using ParameterTool with the
>> config parameter names, but it has not worked as yet.
>>
>> Thanks for your help.
>>
>> Mans
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-28 Thread Xintong Song
Hi, Singh,

I don't think that should work. The -D or -yD parameters needs to be passed
to the Flink start-up scripts or the "flink run" command. I don't think the
IntelliJ VM arguments are equivalent to that. In fact, I'm not aware of any
method to set "-D" parameters when running jobs IDE.

Thank you~

Xintong Song



On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:

> Hi Xintong:
>
> I passed the -Dparallelism.default=2 in the  run configuration VM
> arguments for IntelliJ.
>
> So what I am looking for is a way to overwrite the config parameters which
> are defined in the flink-config.yaml file (parallelism.default is just an
> example) which would be picked up regardless of the env (eg: locally, on
> yarn or IDE).  When I run the application in IDE (locally) with the above
> mentioned VM parameter, the StreamExecutionEnvironment.config does not show
> this value and the Flink UI shows configuration parameter parallelism as
> 8.  Is there any other place where I can see the parameter settings ?
>
> Thanks.
>
> On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song <
> tonysong...@gmail.com> wrote:
>
>
> Could you provide some more details on how you run your job with -D
> options in IDE?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:
>
> Hi Xintong:  Thanks for your pointers.
>
> I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do
> you know if there is a common way that would work both for emr, locally and
> ide ?
>
> Thanks again.
>
> On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song <
> tonysong...@gmail.com> wrote:
>
>
> Hi Singh,
>
> You can use the environment variable "FLINK_CONF_DIR" to specify path to
> the directory of config files. You can also override config options with
> command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink
> run' command).
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:
>
> Hi:
>
> I have a single EMR cluster with Flink and want to run multiple
> applications on it with different flink configurations.  Is there a way to
>
> 1. Pass the config file name for each application, or
> 2. Overwrite the config parameters via command line arguments for the
> application.  This is similar to how we can overwrite the default
> parameters in spark
>
> I searched the documents and have tried using ParameterTool with the
> config parameter names, but it has not worked as yet.
>
> Thanks for your help.
>
> Mans
>
>


Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio



LookupableTableSource question

2019-06-28 Thread Flavio Pompermaier
Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered
either when a key was not found (a new key has probably been added in the
mean time) or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks
very similar to what I'd like to achieve but I can't find a real-world
example using it and it lacks of such 2 requirements (key-values are not
refreshed after a configurable timeout and a KeyNotFound callback cannot be
handled).

Any help is appreciated,
Flavio


Re:Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread USERNAME
您好,


非常感谢 唐老师 您的回复,问题已解决。谢谢!



在 2019-06-28 15:59:48,"Yun Tang"  写道:
>你好
>
>因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 
>了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。
>
>如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。
>
>[1] https://issues.apache.org/jira/browse/FLINK-11266
>[2] https://flink.apache.org/downloads.html
>
>祝好
>唐云
>
>
>
>From: USERNAME 
>Sent: Friday, June 28, 2019 15:41
>To: user-zh@flink.apache.org
>Subject: Re:Flink1.8+Hadoop3.1.2 编译问题
>
>修正图片内容
>
>
>
>在 2019-06-28 15:26:57,"USERNAME"  写道:
>
>1.软件版本
>Flink 1.8
>Hadoop 3.1.2
>Apache Maven 3.0.5
>
>
>2.操作方式
>>git clone -b release-1.8.0 https://github.com/apache/flink
>>cd flink
>>mvn clean install -DskipTests -Dhadoop.version=3.1.2
>
>
>3.问题
>编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
>-rw-r--r-- 1 flink flink 96049496 Jun 28 15:17 flink-dist_2.11-1.8.0.jar
>-rw-rw-r-- 1 flink flink   489884 Jun 19 13:35 log4j-1.2.17.jar
>-rw-rw-r-- 1 flink flink 9931 Jun 19 13:35 slf4j-log4j12-1.7.15.jar
>
>
>正常的Flink1.7.2的编译结果(↓)
>-rw-r--r-- 1 flink flink 93445603 Mar 27 22:46 flink-dist_2.11-1.7.2.jar
>-rw-r--r-- 1 flink flink   141881 Mar 27 22:44 flink-python_2.11-1.7.2.jar
>-rw-r--r-- 1 flink flink 53380671 Mar 27 22:19 
>flink-shaded-hadoop2-uber-1.7.2.jar
>-rw-rw-r-- 1 flink flink   489884 Mar 27 22:16 log4j-1.2.17.jar
>-rw-rw-r-- 1 flink flink 9931 Mar 27 22:16 slf4j-log4j12-1.7.15.jar
>
>
>有没有遇到过此问题的??
>
>
>
>
>
>
>
>
>


Re: Debug Kryo.Serialization Exception

2019-06-28 Thread Flavio Pompermaier
Indeed looking at StreamElementSerializer the duplicate() method could be
bugged:

@Override
public StreamElementSerializer duplicate() {
  TypeSerializer copy = typeSerializer.duplicate();
  return (copy == typeSerializer) ? this : new
StreamElementSerializer(copy);
}

Is ti safe to return this when copy == typeSerializer ...?

On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
wrote:

> Hi Fabian,
> we had similar errors with Flink 1.3 [1][2] and the error was caused by
> the fact that a serialised was sharing the same object with multiple
> threads.
> The error was not deterministic and was changing from time to time.
> So maybe it could be something similar (IMHO).
>
> [1] http://codeha.us/apache-flink-users/msg02010.html
> [2]
> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e
>
> Best,
> Flavio
>
> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert  wrote:
>
>> additionally we have these coming with this as well all the time:
>>
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>  at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>  at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>  at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>>
>> or
>>
>>
>> com.esotericsoftware.kryo.KryoException: 
>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>> Serialization trace:
>> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>>  at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>>  at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>>  at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>  at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>>  at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>  at java.util.ArrayList.get(ArrayList.java:433)
>>  at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>  at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>>  at 
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>>  at 
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> 

Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread Yun Tang
你好

因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 
了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。

如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。

[1] https://issues.apache.org/jira/browse/FLINK-11266
[2] https://flink.apache.org/downloads.html

祝好
唐云



From: USERNAME 
Sent: Friday, June 28, 2019 15:41
To: user-zh@flink.apache.org
Subject: Re:Flink1.8+Hadoop3.1.2 编译问题

修正图片内容



在 2019-06-28 15:26:57,"USERNAME"  写道:

1.软件版本
Flink 1.8
Hadoop 3.1.2
Apache Maven 3.0.5


2.操作方式
>git clone -b release-1.8.0 https://github.com/apache/flink
>cd flink
>mvn clean install -DskipTests -Dhadoop.version=3.1.2


3.问题
编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
-rw-r--r-- 1 flink flink 96049496 Jun 28 15:17 flink-dist_2.11-1.8.0.jar
-rw-rw-r-- 1 flink flink   489884 Jun 19 13:35 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9931 Jun 19 13:35 slf4j-log4j12-1.7.15.jar


正常的Flink1.7.2的编译结果(↓)
-rw-r--r-- 1 flink flink 93445603 Mar 27 22:46 flink-dist_2.11-1.7.2.jar
-rw-r--r-- 1 flink flink   141881 Mar 27 22:44 flink-python_2.11-1.7.2.jar
-rw-r--r-- 1 flink flink 53380671 Mar 27 22:19 
flink-shaded-hadoop2-uber-1.7.2.jar
-rw-rw-r-- 1 flink flink   489884 Mar 27 22:16 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9931 Mar 27 22:16 slf4j-log4j12-1.7.15.jar


有没有遇到过此问题的??











Re: How to run Graph algorithms

2019-06-28 Thread Robert Metzger
Hey,
this page explains how to run a Flink job:
https://ci.apache.org/projects/flink/flink-docs-master/getting-started/tutorials/local_setup.html

On Sat, May 25, 2019 at 1:28 PM RAMALINGESWARA RAO THOTTEMPUDI <
tr...@iitkgp.ac.in> wrote:

>
> Respected All,
> I am a new learner of Apache Flink. I want to run existing Graph
> algorithms (examples) given in Flink download software with my own data.
> But I am not getting how to run those existing example algos on my input
> data. Kindly suggest me a solution.
> --
> *From: *"bastien dine" 
> *To: *"Boris Lublinsky" 
> *Cc: *"user" 
> *Sent: *Saturday, May 25, 2019 1:15:32 PM
> *Subject: *Re: Job cluster and HA
>
> Hello Boris,
> I think you are confused by the name of the shell script
> "standalone-job.sh"
> Which basically means that we start a "standalone job manager" as stated
> in the first comment of
>
> https://github.com/apache/flink/blob/release-1.8/flink-dist/src/main/flink-bin/bin/standalone-job.sh
>
> This is another version of
> : flink-dist/src/main/flink-bin/bin/jobmanager.sh
>
> It's not related to a job
>
> When you configure H-A on a flink cluster, and you submit a job, Flink
> (i.e the jobmanager) store the state of the job in Zookeeper / HDFS
> So when it crashes and comes back (with this entrypoint) it will read in
> ZK / HDFS and restore previous execution
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le ven. 24 mai 2019 à 23:22, Boris Lublinsky <
> boris.lublin...@lightbend.com> a écrit :
>
>> Hi,
>> I was experimenting with HA lately and see that it recovers successfully
>> job, in the case of jobmanager restarts.
>> Now my question is whether it will work for the job cluster.
>> Based on the instructions
>> https://github.com/apache/flink/blob/release-1.8/flink-container/docker/README.md
>> I can see
>> https://github.com/apache/flink/blob/release-1.8/flink-container/docker/docker-entrypoint.sh
>>  that
>> In this case the following command is invoked:
>> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@“
>>
>> Which means that if a jobManager restarts, the following is going to
>> happen:
>>
>> 1. It will use HA to restore job that was running
>> 2. A new job will be submitted, overwriting restored job and bypassing
>> checkpoint restore.
>>
>> Am I missing something here?
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>
>


Re: Debug Kryo.Serialization Exception

2019-06-28 Thread Flavio Pompermaier
Hi Fabian,
we had similar errors with Flink 1.3 [1][2] and the error was caused by the
fact that a serialised was sharing the same object with multiple threads.
The error was not deterministic and was changing from time to time.
So maybe it could be something similar (IMHO).

[1] http://codeha.us/apache-flink-users/msg02010.html
[2]
http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e

Best,
Flavio

On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert  wrote:

> additionally we have these coming with this as well all the time:
>
> com.esotericsoftware.kryo.KryoException: 
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> or
>
>
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 97, Size: 29
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   ... 12 more
>
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fab...@zalando.de
> Phone: +49 152 03479412
>
>
>
> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert <
> fab...@zalando.de>:
>
>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom
>> Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent
>> times, we see more 

Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-28 Thread Vadim Vararu
Hi,

I've run it on a standalone Flink cluster. No Yarn involved.

From: Haibo Sun 
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: user@flink.apache.org
Subject: Re:Flink batch job memory/disk leak when invoking set method on a 
static Configuration object.

Hi, Vadim

This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:86fb55b$1$16b9c126cfc$Coremail$sunhaibotb$163.com]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread USERNAME
1.软件版本
Flink 1.8
Hadoop 3.1.2
Apache Maven 3.0.5


2.操作方式
>git clone -b release-1.8.0 https://github.com/apache/flink
>cd flink
>mvn clean install -DskipTests -Dhadoop.version=3.1.2


3.问题
编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
正常的Flink1.7.2的编译结果(↓)
有没有遇到过此问题的??





Re:Re: checkpoint stage size的问题

2019-06-28 Thread CHENJIE
你好,
如果有需要session窗口可能保持很长时间,数据量也很大,这种窗口会导致checkpoint stage size变的非常大
有没有一种机制可能让超过一定时间的状态失效并且丢弃掉?








在 2019-06-26 16:23:13,"Yun Tang"  写道:
>你好
>
>这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state 
>size变大有几个原因:
>
>  1.  上游数据量增大。
>  2.  window设置时间较长,尚未触发,导致window内积攒的数据比较大。
>  3.  window的类型决定了所需要存储的state size较大。
>
>可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state 
>size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations
>
>祝好
>唐云
>
>From: ReignsDYL <1945627...@qq.com>
>Sent: Wednesday, June 26, 2019 14:22
>To: user-zh@flink.apache.org
>Subject: checkpoint stage size的问题
>
>各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
>size越来越大,请问是什么原因啊?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Debug Kryo.Serialization Exception

2019-06-28 Thread Fabian Wollert
additionally we have these coming with this as well all the time:

com.esotericsoftware.kryo.KryoException:
java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


or


com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 12 more


--


*Fabian WollertZalando SE*

E-Mail: fab...@zalando.de
Phone: +49 152 03479412



Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert :

> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom
> Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent
> times, we see more and more Exceptions happening like this:
>
> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>   at 
>