Re: Read data from Oracle using Flink SQL API

2020-02-05 Thread Bowen Li
Hi Flavio,

+1 for adding Oracle (potentially more dbms like SqlServer, etc) to
flink-jdbc. Would you mind open a parent ticket and some subtasks, each one
for one to-be-added dbms you've thought of?


On Sun, Feb 2, 2020 at 10:11 PM Jingsong Li  wrote:

> Yes, And I think we should add OracleDialect,SqlServerDialect,DB2Dialect
> support too.
>
> Best,
> Jingsong Lee
>
> On Sun, Feb 2, 2020 at 5:53 PM Flavio Pompermaier 
> wrote:
>
>> Ok thanks for this info! Maybe this could be added to the
>> documentation..what do you think?
>>
>> Il Dom 2 Feb 2020, 08:37 Jingsong Li  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> You can use `JDBCTableSource`, and register it from
>>>  TableEnvionment.registerTableSource, you need provide
>>>  a OracleDialect, maybe just implement `canHandle` and
>>>  `defaultDriverName` is OK.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Sun, Feb 2, 2020 at 2:42 PM Jark Wu  wrote:
>>>
 Hi Flavio,

 If you want to adjust the writing statement for Oracle, you can
 implement the JDBCDialect for Oracle, and pass to the JDBCUpsertTableSink
 when constructing via `JDBCOptions.Builder#setDialect`. In this way, you
 don't need to recompile the source code of flink-jdbc.

 Best,
 Jark

 On Fri, 31 Jan 2020 at 19:28, Flavio Pompermaier 
 wrote:

> Hi to all,
> I was looking at the Flink SQL API's and I discovered that only a few
> drivers are supported [1], i.e. Mysql, Postgres and Derby. You could have
> problems only on the writing side of the connector (TableSink) because you
> need to adjust the override statement, but for the read part you shouldn't
> have problems with dialects...am I wrong?
> And what am I supposed to do right now if I want to connect to Oracle
> using the Table API? Do I have to use the low level JDBCInputFormat? Is
> there an easy way to connect to Oracle using the Table API without the 
> need
> to modify and recompile the source code of Flink (just adding some
> interface implementation in the application JAR)?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
>
> Best,
> Flavio
>

>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-02-05 Thread Jingsong Li
Thanks Fanbin,

I will try to find the bug, and track it.

Best,
Jingsong Lee

On Thu, Feb 6, 2020 at 7:50 AM Fanbin Bu  wrote:

> Jingsong,
>
> I created https://issues.apache.org/jira/browse/FLINK-15928 to track the
> issue. Let me know if you need anything else to debug.
>
> Thanks,
> Fanbin
>
>
> On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise  wrote:
>
>> Hi Fanbin,
>>
>> you could use the RC1 of Flink that was created yesterday and use the
>> apache repo
>> https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
>> .
>> Alternatively, if you build Flink locally with `mvn install`, then you
>> could use mavenLocal() in your gradle.build and feed from that.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:
>>
>>> I can build flink 1.10 and install it on to EMR
>>> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
>>> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
>>> continue to use 1.9.0 since there is no 1.10 available?
>>>
>>> Thanks,
>>> Fanbin
>>>
>>> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>>>
 Hi Fanbin,

 You can install your own Flink build in AWS EMR, and it frees you from
 Emr’s release cycles

 On Thu, Jan 23, 2020 at 03:36 Jingsong Li 
 wrote:

> Fanbin,
>
> I have no idea now, can you created a JIRA to track it? You can
> describe complete SQL and some data informations.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Do you have any suggestions to debug the above mentioned
>> IndexOutOfBoundsException error?
>> Thanks,
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>> wrote:
>>
>>> I got the following error when running another job. any suggestions?
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException
>>> at
>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>>> at
>>> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 I set the config value to be too large. After I changed it to a
 smaller number it works now!
 thanks you for the help. really appreciate it!

 Fanbin

 On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at 
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>

Re: Re: Flink build-in functions

2020-02-05 Thread Jingsong Li
Hi sunfulin,

When merging blink, we combed the semantics of all functions at present,
and removed a few functions whose semantics are not clearly defined at
present. "date_format" should be one of the victim.
You can implement your UDF.
And you can create a JIRA to support "date_format" too.

Best,
Jingsong Lee

On Thu, Feb 6, 2020 at 10:32 AM sunfulin  wrote:

> Hi, Jingsong
> Yep, I'm using blink planner as the following approach.
>
> *EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();*
> *initPack.tableEnv =
> org.apache.flink.table.api.java.StreamTableEnvironment.create(initPack.env,
> bsSettings);*
>
> While running the job, I can see one log from console.
>
> [main] INFO  org.apache.flink.table.module.ModuleManager  - Cannot find
> FunctionDefinition date_fromat from any loaded modules
>
> Why Flink cannot load the function definition? From what I can see, Flink
> 1.10 with blink-planner shall support this kind of function.
>
>
>
>
> At 2020-02-04 12:35:12, "Jingsong Li"  wrote:
>
> Hi Sunfulin,
>
> Did you use blink-planner? What functions are missing?
>
> Best,
> Jingsong Lee
>
> On Tue, Feb 4, 2020 at 12:23 PM Wyatt Chun  wrote:
>
>> They are two different systems for differentiated usage. For your
>> question, why don’t give a direct try on Blink?
>>
>> Regards
>>
>> On Tue, Feb 4, 2020 at 10:02 AM sunfulin  wrote:
>>
>>> As far as I can see, the latest flink version does not have a fullfilled
>>> support for blink build-in functions. Many date functions and string
>>> functions can not be used in Flink. I want to know that when shall we use
>>> flink just as to use blink in the same way.
>>>
>>>
>>>
>>>
>>
>
> --
> Best, Jingsong Lee
>
>
>
>
>


-- 
Best, Jingsong Lee


Re:Re: Flink build-in functions

2020-02-05 Thread sunfulin
Hi, Jingsong
Yep, I'm using blink planner as the following approach.


EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
initPack.tableEnv = 
org.apache.flink.table.api.java.StreamTableEnvironment.create(initPack.env, 
bsSettings);


While running the job, I can see one log from console. 


[main] INFO  org.apache.flink.table.module.ModuleManager  - Cannot find 
FunctionDefinition date_fromat from any loaded modules


Why Flink cannot load the function definition? From what I can see, Flink 1.10 
with blink-planner shall support this kind of function. 









At 2020-02-04 12:35:12, "Jingsong Li"  wrote:

Hi Sunfulin,


Did you use blink-planner? What functions are missing?


Best,
Jingsong Lee


On Tue, Feb 4, 2020 at 12:23 PM Wyatt Chun  wrote:

They are two different systems for differentiated usage. For your question, why 
don’t give a direct try on Blink?


Regards 


On Tue, Feb 4, 2020 at 10:02 AM sunfulin  wrote:


As far as I can see, the latest flink version does not have a fullfilled 
support for blink build-in functions. Many date functions and string functions 
can not be used in Flink. I want to know that when shall we use flink just as 
to use blink in the same way.  





 





--

Best, Jingsong Lee

Re: batch job OOM

2020-02-05 Thread Fanbin Bu
Jingsong,

I created https://issues.apache.org/jira/browse/FLINK-15928 to track the
issue. Let me know if you need anything else to debug.

Thanks,
Fanbin


On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise  wrote:

> Hi Fanbin,
>
> you could use the RC1 of Flink that was created yesterday and use the
> apache repo
> https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
> .
> Alternatively, if you build Flink locally with `mvn install`, then you
> could use mavenLocal() in your gradle.build and feed from that.
>
> Best,
>
> Arvid
>
> On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:
>
>> I can build flink 1.10 and install it on to EMR
>> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
>> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
>> continue to use 1.9.0 since there is no 1.10 available?
>>
>> Thanks,
>> Fanbin
>>
>> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>>
>>> Hi Fanbin,
>>>
>>> You can install your own Flink build in AWS EMR, and it frees you from
>>> Emr’s release cycles
>>>
>>> On Thu, Jan 23, 2020 at 03:36 Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 I have no idea now, can you created a JIRA to track it? You can
 describe complete SQL and some data informations.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Do you have any suggestions to debug the above mentioned
> IndexOutOfBoundsException error?
> Thanks,
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
> wrote:
>
>> I got the following error when running another job. any suggestions?
>>
>> Caused by: java.lang.IndexOutOfBoundsException
>> at
>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>> at
>> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> I set the config value to be too large. After I changed it to a
>>> smaller number it works now!
>>> thanks you for the help. really appreciate it!
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Looks like your config is wrong, can you show your config code?

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>   at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>   at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>   at LocalHashWinAggWithKeys$292.open(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
> wrote:

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

2020-02-05 Thread Milind Vaidya
The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports
having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR   :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size:   1024m
jobmanager.rpc.address  :   10.0.16.10
jobmanager.rpc.port :   6123
jobstore.cache-size :   52428800
jobstore.expiration-time:   3600
parallelism.default :   4
slot.idle.timeout   :   5
slot.request.timeout:   30
task.cancellation.interval  :   3
task.cancellation.timeout   :   18
task.cancellation.timers.timeout:   7500
taskmanager.exit-on-fatal-akka-error:   false
taskmanager.heap.size   :   1024m
taskmanager.network.bind-policy :   "ip"
taskmanager.numberOfTaskSlots   :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout:   5min
taskmanager.rpc.port:   50100-50200
web.tmpdir  :
/tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is
easier to put the various details.
https://stackoverflow.com/questions/60082479/flink-1-9-standalone-cluster-failed-to-transfer-file-from-taskexecutor-id

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger  wrote:

> Hi,
>
> I don't think this is a bug. It looks like the machines can not talk to
> each other. Can you validate that all the machines can talk to each other
> on the ports used by Flink (6123, 8081, ...)
> If that doesn't help:
> - How is the network set up?
> - Are you running physical machines / VMs / containers?
> - Is there a firewall involved?
>
> Best,
> Robert
>
>
> On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya  wrote:
>
>> Hi
>>
>> I am trying to build a cluster for flink with 1 master and 2 workers.
>> The program is working fine locally. The messages are read from Kafka and
>> just printed on STDOUT.
>>
>> The cluster is successfully created and UI is also shows all config. But
>> the job fails to execute on the cluster.
>>
>> Here are few exceptions I see in the log files
>>
>> File : flink-root-standalonesession
>>
>> 2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor
>>- No response from remote for outbound association.
>> Associate timed out after [2 ms].
>> 2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor
>>- No response from remote for outbound association.
>> Associate timed out after [2 ms].
>> 2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system
>> [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for
>> [50] ms. Reason: [Association failed with 
>> [akka.tcp://flink-metrics@ip:39493]]
>> Caused by: [No response
>>  from remote for outbound association. Associate timed out after [2
>> ms].]
>> 2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system
>> [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for
>> [50] ms. Reason: [Association failed with 
>> [akka.tcp://flink-metrics@ip:34094]]
>> Caused by: [No response f
>> rom remote for outbound association. Associate timed out after [2
>> ms].]
>> 2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
>> connection timed out: /ip:39493
>> 2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [null] failed with
>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
>> connection timed out: /ip:34094
>> 2020-01-29 19:58:21,880 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
>>  - Failed to transfer file from TaskExecutor
>> a7abe6e294fa3ae4129fd695f7309a36.
>> java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/resourcemanager#5385019]] after [1 ms].
>> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].
>> A typical reason for `AskTimeoutException` is that the recipient actor
>> didn't send a reply.
>>
>>
>> File : flink-root-client-ip
>>
>>
>> 2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend
>>   - Could not load CLI class
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>> java.lang.NoClassDefFoundError:
>> 

Flink TaskManager Logs polluted with InfluxDB metrics reporter errors

2020-02-05 Thread Morgan Geldenhuys


I am trying to setup metrics reporting for Flink using InflixDB, however 
I am receiving tons of exceptions (listed right at the bottom).


Reporting is setup as recommended by the documentation:

metrics.reporter.influxdb.class: 
org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: influxdb
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: ***
metrics.reporter.influxdb.password: ***

Any hints at what would cause all these exceptions?

2020-02-0518:15:17,777WARNorg.apache.flink.runtime.metrics.MetricRegistryImpl- 
Errorwhilereporting metrics
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: 
partial write: unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_sync-time-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_rebalance-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_reauthentication-latency-max,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 
'taskmanager_job_task_operator_partition-revoked-latency-avg,host=flink-taskmanager-6484bdf6c5-sctng,job_id=6fe7d657f2a2f989115752d7e1a5cf5b,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ 
Custom\ 
Source,subtask_index=0,task_attempt_id=2087c37093ea7c1d6896caf6e3768b88,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ 
Custom\ Source\ ->\ Filter,tm_id=7adf9ad0bf8edc167cfafbe9bc22f2f2 
value=� 158092651772000': invalid boolean
unable to parse 

Re: [BULK]Re: DisableGenericTypes is not compatible with Kafka

2020-02-05 Thread Oleksandr Nitavskyi
Thanks, guys for the answers.

Aljoscha, I have a question to ensure I get it right.
Am I correctly understand that this newly created TypeSerializer should use 
Kryo under the hood, so we keep the backward compatibility of the state and do 
not get an exception if generic types are disabled?

Thanks
Kind Regards
Oleksandr

From: Aljoscha Krettek 
Sent: Tuesday, February 4, 2020 2:29 PM
To: user@flink.apache.org 
Subject: [BULK]Re: DisableGenericTypes is not compatible with Kafka

Unfortunately, the fact that the Kafka Sources use Kryo for state
serialization is a very early design misstep that we cannot get rid of
for now. We will get rid of that when the new source interface lands
([1]) and when we have a new Kafka Source based on that.

As a workaround, we should change the Kafka Consumer to go through a
different constructor of ListStateDescriptor which directly takes a
TypeSerializer instead of a TypeInformation here: [2]. This should
sidestep the "no generic types" check.

I created a Jira Issue for this:
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15904data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634652116sdata=OIMcxBp5dh%2FxZQw%2BBWTEkQnMHh%2BzengVNvW%2B%2FZvZRbY%3Dreserved=0

Best,
Aljoscha

[1]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterfacedata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=sY7nurLvKaR7YnHIAr8ZFEdUmjuMfN%2BrYvMliCRSBh0%3Dreserved=0
[2]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F68cc21e4af71505efa142110e35a1f8b1c25fe6e%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L860data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=fB%2F%2FOK7sSA93TycaSV5Z0g8EPYglH8fSlRhRt3nJLVE%3Dreserved=0

On 01.02.20 09:44, Guowei Ma wrote:
> Hi,
> I think there could be two workaround ways to 'disableGenericType' in case
> of KafkaSource :
> 1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
> 2. using the reflection to call the private method. :)
>
> Maybe we could add this TypeInfo annotation to the KafakaConnector.
>
> [1]
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Ftypes_serialization.html%23defining-type-information-using-a-factorydata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=YbnYb1Cjf%2BqotG8WkE8hC8ElpX9S2C%2BPDn464Hn5XyI%3Dreserved=0
>
> Best,
> Guowei
>
>
> Oleksandr Nitavskyi  于2020年1月31日周五 上午12:40写道:
>
>> Hi guys,
>>
>>
>>
>> We have encountered on some issue related with possibility to
>> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
>> idea to ensure that nobody introduce some random change which penalize the
>> performance of the job.
>>
>>
>>
>> The issue we have encountered is that Flink’s KafkaSource is storing
>> KafkaTopicPartition in the state for offset recovery, which is serialized
>> with Kryo.
>>
>> For sure this feature itself is not penalizing performance, but looks like
>> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
>> on the side of Flink user there is no good tool to add
>> KafkaTopicPartition’s non-Kryo type information.
>>
>>
>>
>> On of the related tickets I have found:
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12031data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=%2BvVo6XdXdYbHgOQWO59On8zim4WR2yIPTVwUgUxql6w%3Dreserved=0
>>
>>
>>
>> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
>> or what do you think making some development to address this issue?
>>
>>
>>
>> Kind Regards
>>
>> Oleksandr
>>
>>
>>
>


Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Chesnay Schepler
No, since a) HA will never use classes from the user-jar and b) 
zookeeper is relocated to a different package (to avoid conflicts) and 
hence any replacement has to follow the same relocation convention.


On 05/02/2020 15:38, Maxim Parkachov wrote:

Hi Chesnay,

thanks for advise. Will it work if I include MapR specific zookeeper 
in job dependencies and still use out-of-box Flink binary distribution ?


Regards,
Maxim.

On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler > wrote:


You must rebuild Flink while overriding zookeeper.version property
to match your MapR setup.
For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
Note that you will also have to configure the MapR repository in
your local setup as described here
.

On 05/02/2020 15:12, Maxim Parkachov wrote:

Hi everyone,

I have already written about issue with Flink 1.9 on secure MapR
cluster and high availability. The issue was resolved with custom
compiled Flink with vendor mapr repositories enabled. The history
could be found
https://www.mail-archive.com/user@flink.apache.org/msg28235.html

Unfortunately, in current 1.10 RC vendor repositories were
removed and I'm failing to get working configuration.  Current
situation with 1.10 RC and secure MapR cluster:

1. Without HA, Flink uses class path provided zookeeper jar (mapr
specific) and everything works fine.

2. With HA enabled, Flink uses shaded zookeeper 
(org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
which doesn't have MapR specific changes and fails to authenticate.

I would really appreciate any help in resolving this issue.I'm
ready to provide any required details.

Regards,
Maxim.







Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Maxim Parkachov
Hi Chesnay,

thanks for advise. Will it work if I include MapR specific zookeeper in job
dependencies and still use out-of-box Flink binary distribution ?

Regards,
Maxim.

On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler  wrote:

> You must rebuild Flink while overriding zookeeper.version property to
> match your MapR setup.
> For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
> Note that you will also have to configure the MapR repository in your
> local setup as described here
> .
>
> On 05/02/2020 15:12, Maxim Parkachov wrote:
>
> Hi everyone,
>
> I have already written about issue with Flink 1.9 on secure MapR cluster
> and high availability. The issue was resolved with custom compiled Flink
> with vendor mapr repositories enabled. The history could be found
> https://www.mail-archive.com/user@flink.apache.org/msg28235.html
>
> Unfortunately, in current 1.10 RC vendor repositories were removed and I'm
> failing to get working configuration.  Current situation with 1.10 RC and
> secure MapR cluster:
>
> 1. Without HA, Flink uses class path provided zookeeper jar (mapr
> specific) and everything works fine.
>
> 2. With HA enabled, Flink uses shaded zookeeper  
> (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
> which  doesn't have MapR specific changes and fails to authenticate.
>
> I would really appreciate any help in resolving this issue.I'm ready to
> provide any required details.
>
> Regards,
> Maxim.
>
>
>


Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Chesnay Schepler
You must rebuild Flink while overriding zookeeper.version property to 
match your MapR setup.

For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
Note that you will also have to configure the MapR repository in your 
local setup as described here 
.


On 05/02/2020 15:12, Maxim Parkachov wrote:

Hi everyone,

I have already written about issue with Flink 1.9 on secure MapR 
cluster and high availability. The issue was resolved with custom 
compiled Flink with vendor mapr repositories enabled. The history 
could be found 
https://www.mail-archive.com/user@flink.apache.org/msg28235.html


Unfortunately, in current 1.10 RC vendor repositories were removed and 
I'm failing to get working configuration.  Current situation with 1.10 
RC and secure MapR cluster:


1. Without HA, Flink uses class path provided zookeeper jar (mapr 
specific) and everything works fine.


2. With HA enabled, Flink uses shaded zookeeper  
(org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn) 
which doesn't have MapR specific changes and fails to authenticate.


I would really appreciate any help in resolving this issue.I'm ready 
to provide any required details.


Regards,
Maxim.





NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-05 Thread David Magalhães
I'm implementing an exponential backoff inside a custom sink that uses an
AvroParquetWriter to write to S3. I've change the number of attempts to 0
inside the core-site.xml, and I'm capturing the timeout exception, doing a
Thread.sleep for X seconds. This is working as intended, and when S3 is
offline, it waits until it is online.

I also want to test that the back pressure and the checkpoints are working
as intended, and for the first one, I can see the back pressure in Flink UI
going up, and recover as expected and not reading more data from Kafka.

For the checkpoints, and I've added inside the sink invoke function a
randomly exception (1 in 100, to simulate that a problem has happen, and
need to recover from the last good checkpoint), but something strange
happens. I can see the job is being canceled and created again, and running
fine, other times after a X number of times of being created and canceled,
it gives a *NoClassDefFoundError*, and it will keep giving that forever.

Do you guys have any thoughts?

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 more
Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket
at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246)
at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)
at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.util.Try$.apply(Try.scala:209)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at

Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Maxim Parkachov
Hi everyone,

I have already written about issue with Flink 1.9 on secure MapR cluster
and high availability. The issue was resolved with custom compiled Flink
with vendor mapr repositories enabled. The history could be found
https://www.mail-archive.com/user@flink.apache.org/msg28235.html

Unfortunately, in current 1.10 RC vendor repositories were removed and I'm
failing to get working configuration.  Current situation with 1.10 RC and
secure MapR cluster:

1. Without HA, Flink uses class path provided zookeeper jar (mapr specific)
and everything works fine.

2. With HA enabled, Flink uses shaded zookeeper
(org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
which  doesn't have MapR specific changes and fails to authenticate.

I would really appreciate any help in resolving this issue.I'm ready to
provide any required details.

Regards,
Maxim.


Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

2020-02-05 Thread Robert Metzger
Hi,

I don't think this is a bug. It looks like the machines can not talk to
each other. Can you validate that all the machines can talk to each other
on the ports used by Flink (6123, 8081, ...)
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers?
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya  wrote:

> Hi
>
> I am trying to build a cluster for flink with 1 master and 2 workers.
> The program is working fine locally. The messages are read from Kafka and
> just printed on STDOUT.
>
> The cluster is successfully created and UI is also shows all config. But
> the job fails to execute on the cluster.
>
> Here are few exceptions I see in the log files
>
> File : flink-root-standalonesession
>
> 2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor
>  - No response from remote for outbound association.
> Associate timed out after [2 ms].
> 2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor
>  - No response from remote for outbound association.
> Associate timed out after [2 ms].
> 2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for
> [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]]
> Caused by: [No response
>  from remote for outbound association. Associate timed out after [2
> ms].]
> 2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for
> [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]]
> Caused by: [No response f
> rom remote for outbound association. Associate timed out after [2 ms].]
> 2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
> connection timed out: /ip:39493
> 2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [null] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
> connection timed out: /ip:34094
> 2020-01-29 19:58:21,880 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
>  - Failed to transfer file from TaskExecutor
> a7abe6e294fa3ae4129fd695f7309a36.
> java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/resourcemanager#5385019]] after [1 ms].
> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].
> A typical reason for `AskTimeoutException` is that the recipient actor
> didn't send a reply.
>
>
> File : flink-root-client-ip
>
>
> 2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend
>   - Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 5 more
> 2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem
>   - Hadoop is not in the classpath/dependencies. The
> extended set of supported File Systems via Hadoop is not available.
> 2020-01-29 19:48:10,856 INFO
>  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
> create Hadoop Security Module because Hadoop cannot be found in the
> Classpath.
> 2020-01-29 19:48:10,874 INFO
>  org.apache.flink.runtime.security.SecurityUtils   - Cannot
> install HadoopSecurityContext because Hadoop cannot be found in the
> Classpath.
> 2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend
>   - Running 'run' command.
> 2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend
>   - Building program from JAR file
> 2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration
>  - Config uses 

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-05 Thread Kostas Kloudas
Hi Mark,

This feature of customizing the rolling policy even for bulk formats will
be in the upcoming 1.10 release as described in [1]
although the documentation for the feature is pending [2]. But I hope that
it will be merged on time for the release.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13027
[2] https://issues.apache.org/jira/browse/FLINK-15476

On Mon, Feb 3, 2020 at 8:14 PM Kostas Kloudas  wrote:

> Hi Mark,
>
> Currently no, but if rolling on every checkpoint is ok with you, in future
> versions it is easy to allow to roll on every checkpoint, but also on
> inactivity intervals.
>
> Cheers,
> Kostas
>
> On Mon, Feb 3, 2020 at 5:24 PM Mark Harris 
> wrote:
>
>> Hi Kostas,
>>
>> Thanks for your help here - I think we're OK with the increased heap
>> size, but happy to explore other alternatives.
>>
>> I see the problem - we're currently using a BulkFormat, which doesn't
>> seem to let us override the rolling policy. Is there an equivalent for the
>> BulkFormat?
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Kostas Kloudas 
>> *Sent:* 03 February 2020 15:39
>> *To:* Mark Harris 
>> *Cc:* Piotr Nowojski ; Cliff Resnick <
>> cre...@gmail.com>; David Magalhães ; Till
>> Rohrmann ; flink-u...@apache.org <
>> flink-u...@apache.org>
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi Mark,
>>
>> You can use something like the following and change the intervals
>> accordingly:
>>
>> final StreamingFileSink sink = StreamingFileSink
>>   .forRowFormat(new Path(outputPath), new
>> SimpleStringEncoder<>("UTF-8"))
>>.withRollingPolicy(
>>DefaultRollingPolicy.builder()
>>   .
>> withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>>   .
>> withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>>   .
>> withMaxPartSize(1024 * 1024 * 1024)
>>   .
>> build()
>>   ) .build();
>>
>> Let me know if this solves the problem.
>>
>> Cheers,
>> Kostas
>>
>> On Mon, Feb 3, 2020 at 4:11 PM Mark Harris 
>> wrote:
>>
>> Hi Kostas,
>>
>> Sorry, stupid question: How do I set that for a StreamingFileSink?
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Kostas Kloudas 
>> *Sent:* 03 February 2020 14:58
>> *To:* Mark Harris 
>> *Cc:* Piotr Nowojski ; Cliff Resnick <
>> cre...@gmail.com>; David Magalhães ; Till
>> Rohrmann ; flink-u...@apache.org <
>> flink-u...@apache.org>
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi Mark,
>>
>> Have you tried to set your rolling policy to close inactive part files
>> after some time [1]?
>> If the part files in the buckets are inactive and there are no new part
>> files, then the state handle for those buckets will also be removed.
>>
>> Cheers,
>> Kostas
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html
>>
>>
>>
>> On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
>> wrote:
>>
>> Hi all,
>>
>> The out-of-memory heap dump had the answer - the job was failing with an
>> OutOfMemoryError because the activeBuckets members of 3 instances of
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
>> filling a significant enough part of the memory of the taskmanager that no
>> progress was being made. Increasing the memory available to the TM seems to
>> have fixed the problem.
>>
>> I think the DeleteOnExit problem will mean it needs to be restarted every
>> few weeks, but that's acceptable for now.
>>
>> Thanks again,
>>
>> Mark
>> --
>> *From:* Mark Harris 
>> *Sent:* 30 January 2020 14:36
>> *To:* Piotr Nowojski 
>> *Cc:* Cliff Resnick ; David Magalhães <
>> speeddra...@gmail.com>; Till Rohrmann ;
>> flink-u...@apache.org ; kkloudas <
>> kklou...@apache.org>
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi,
>>
>> Thanks for your help with this. 
>>
>> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
>>
>> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
>>
>> Usually the task runs for about 15 minutes before it restarts, usually
>> due to with an "java.lang.OutOfMemoryError: Java heap space" exception.
>>
>> The figures came from a MemoryAnalyzer session on a manual memory dump
>> from one of the taskmanagers. The total size of that heap was only 1.8gb.
>> In that heap, 1.7gb is taken up by the static field "files" in
>> DeleteOnExitHook, which is a linked hash set containing 

Re: Task-manager kubernetes pods take a long time to terminate

2020-02-05 Thread Yang Wang
Maybe you need to check the kubelet logs to see why it get stuck in the
"Terminating" state
for long time. Even it needs to clean up the ephemeral storage, it should
not take so long
time.


Best,
Yang

Li Peng  于2020年2月5日周三 上午10:42写道:

> My yml files follow most of the instructions here:
>
>
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>
> What command did you use to delete the deployments? I use : helm
> --tiller-namespace prod delete --purge my-deployment
>
> I noticed that for environments without much data (like staging), this
> works flawlessly, but in production with high volume of data, it gets stuck
> in a loop. I suspect that the extra time needed to cleanup the task
> managers with high traffic, delays the shutdown until after the job manager
> terminates, and then the task manager gets stuck in a loop when it detects
> the job manager is dead.
>
> Thanks,
> Li
>
>>