Re: How do I start a Flink application on my Flink+Mesos cluster?

2019-09-12 Thread Felipe Gutierrez
Thanks Gary,

I am compiling a new version of Mesos and when I test it again I will reply
here if I found an error.


On Wed, 11 Sep 2019, 09:22 Gary Yao,  wrote:

> Hi Felipe,
>
> I am glad that you were able to fix the problem yourself.
>
> > But I suppose that Mesos will allocate Slots and Task Managers
> dynamically.
> > Is that right?
>
> Yes, that is the case since Flink 1.5 [1].
>
> > Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal
> or
> > less the available cores on a single node of the cluster. I am not sure
> about
> > this parameter, but only after this configuration it worked.
>
> I would need to see JobManager and Mesos logs to understand why this
> resolved
> your issue. If you do not set mesos.resourcemanager.tasks.cpus explicitly,
> Flink will request CPU resources equal to the number of TaskManager slots
> (taskmanager.numberOfTaskSlots) [2]. Maybe this value was too high in your
> configuration?
>
> Best,
> Gary
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> [2]
> https://github.com/apache/flink/blob/0a405251b297109fde1f9a155eff14be4d943887/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java#L344
>
> On Tue, Sep 10, 2019 at 10:41 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> I managed to find what was going wrong. I will write here just for the
>> record.
>>
>> First, the master machine was not login automatically at itself. So I had
>> to give permission for it.
>>
>> chmod og-wx ~/.ssh/authorized_keys
>> chmod 750 $HOME
>>
>> Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal
>> or less the available cores on a single node of the cluster. I am not sure
>> about this parameter, but only after this configuration it worked.
>>
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Fri, Sep 6, 2019 at 10:36 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am running Mesos without DC/OS [1] and Flink on it. Whe I start my
>>> cluster I receive some messages that I suppose everything was started.
>>> However, I see 0 slats available on the Flink web dashboard. But I suppose
>>> that Mesos will allocate Slots and Task Managers dynamically. Is that right?
>>>
>>> $ ./bin/mesos-appmaster.sh &
>>> [1] 16723
>>> flink@r03:~/flink-1.9.0$ I0906 10:22:45.080328 16943 sched.cpp:239]
>>> Version: 1.9.0
>>> I0906 10:22:45.082672 16996 sched.cpp:343] New master detected at
>>> mas...@xxx.xxx.xxx.xxx:5050
>>> I0906 10:22:45.083276 16996 sched.cpp:363] No credentials provided.
>>> Attempting to register without authentication
>>> I0906 10:22:45.086840 16997 sched.cpp:751] Framework registered with
>>> 22f6a553-e8ac-42d4-9a90-96a8d5f002f0-0003
>>>
>>> Then I deploy my Flink application. When I use the first command to
>>> deploy the application starts. However, the tasks remain CREATED until
>>> Flink throws a timeout exception. In other words, it never turns to RUNNING.
>>> When I use the second comman to deploy the application it does not start
>>> and I receive the exception of "Could not allocate all requires slots
>>> within timeout of 30 ms. Slots required: 2". The full stacktrace is
>>> below.
>>>
>>> $ /home/flink/flink-1.9.0/bin/flink run
>>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>>> $ ./bin/mesos-appmaster-job.sh run
>>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html#mesos-without-dcos
>>> ps.: my application runs normally on a standalone Flink cluster.
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>>> (JobID: 7ad8d71faaceb1ac469353452c43dc2a)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at org.hello_flink_mesos.App.(App.java:35)
>>> at org.hello_flink_mesos.App.main(App.java:285)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> at
>>> 

externalizing config flies for flink class loader

2019-09-12 Thread Vishwas Siravara
I have a standalone cluster. I have added my own library(jar file) to the
lib/ folder in flink . I submit my job from cli after I start the cluster.
Now I want to externalize a property file which has to be read by this
library. Since this library is loaded by flink's classloader and not the
application class loader I cannot supply this using flink run -C ... since
this works only for user class loader.


Thanks,
Vishwas


Re: Kafka Schema registry

2019-09-12 Thread Lasse Nedergaard
Hi Elias

Thanks for letting me know. I have found it but we also need the option to 
register Avro Schema’s and use the registry when we write to Kafka. So we will 
create a serialisation version and when it works implement it into Flink and 
create a pull request for the community. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 12. sep. 2019 kl. 17.45 skrev Elias Levy :
> 
> Just for a Kafka source:
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
> 
> There is also a version of this schema available that can lookup the writer’s 
> schema (schema which was used to write the record) in Confluent Schema 
> Registry. Using these deserialization schema record will be read with the 
> schema that was retrieved from Schema Registry and transformed to a 
> statically provided( either through 
> ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or 
> ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).
> 
>> On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard  
>> wrote:
>> Hi. 
>> Do Flink have out of the Box Support for Kafka Schema registry for both 
>> sources and sinks?
>> If not, does anyone knows about a implementation we can build on so we can 
>> help make it general available in a future release. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 


Re: How to handle avro BYTES type in flink

2019-09-12 Thread Catlyn Kong
Turns out there was some other deserialization problem unrelated to this.

On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong  wrote:

> Hi fellow streamers,
>
> I'm trying to support avro BYTES type in my flink application. Since
> ByteBuffer isn't a supported type, I'm converting the field to an
> Array[Byte]:
>
> case Type.BYTES =>
>   (avroObj: AnyRef) => {
>  if (avroObj == null) {
>null
>  } else {
>val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>val bytes = new Array[Byte](byteBuffer.remaining())
>byteBuffer.get(bytes)
>bytes
>}
>  }
>
> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
> I'm getting ArrayIndexOutOfBoundsException:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
> at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>
> Does anyone have experience with deserializing BYTES type from avro and
> make it compatible with the table api? Wondering if it's cause I didn't use
> the correct type or maybe I need to verify if there's enough data left in
> the source?
>
> Any input is appreciated.
>
> Thanks!
> Catlyn
>
>


Re: Kafka Schema registry

2019-09-12 Thread Elias Levy
Just for a Kafka source:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema


   - There is also a version of this schema available that can lookup the
   writer’s schema (schema which was used to write the record) in Confluent
   Schema Registry
   .
   Using these deserialization schema record will be read with the schema that
   was retrieved from Schema Registry and transformed to a statically
   provided( either through
   ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or
   ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).


On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard 
wrote:

> Hi.
> Do Flink have out of the Box Support for Kafka Schema registry for both
> sources and sinks?
> If not, does anyone knows about a implementation we can build on so we can
> help make it general available in a future release.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Thanks Oytun for the reply!

Sorry for not have stated it clearly. When saying "customized
RestartStrategy", we mean that users implement an
*org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
themselves and use it by configuring like "restart-strategy:
org.foobar.MyRestartStrategyFactoryFactory".

The usage of restart strategies you mentioned will keep working with the
new scheduler.

Thanks,
Zhu Zhu

Oytun Tez  于2019年9月12日周四 下午10:05写道:

> Hi Zhu,
>
> We are using custom restart strategy like this:
>
> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
> Time.minutes(10)));
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask how many of you are using a
>> customized RestartStrategy[1] in production jobs.
>>
>> We are currently developing the new Flink scheduler[2] which interacts
>> with restart strategies in a different way. We have to re-design the
>> interfaces for the new restart strategies (so called
>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>> work any more with the new scheduler.
>>
>> We want to know whether we should keep the way
>> to customized RestartBackoffTimeStrategy so that existing customized
>> RestartStrategy can be migrated.
>>
>> I'd appreciate if you can share the status if you are using customized
>> RestartStrategy. That will be valuable for use to make decisions.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>
>> Thanks,
>> Zhu Zhu
>>
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Oytun Tez
Hi Zhu,

We are using custom restart strategy like this:

environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
Time.minutes(10)));


---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:

> Hi everyone,
>
> I wanted to reach out to you and ask how many of you are using a
> customized RestartStrategy[1] in production jobs.
>
> We are currently developing the new Flink scheduler[2] which interacts
> with restart strategies in a different way. We have to re-design the
> interfaces for the new restart strategies (so called
> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
> work any more with the new scheduler.
>
> We want to know whether we should keep the way
> to customized RestartBackoffTimeStrategy so that existing customized
> RestartStrategy can be migrated.
>
> I'd appreciate if you can share the status if you are using customized
> RestartStrategy. That will be valuable for use to make decisions.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
> [2] https://issues.apache.org/jira/browse/FLINK-10429
>
> Thanks,
> Zhu Zhu
>


[SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Hi everyone,

I wanted to reach out to you and ask how many of you are using a customized
RestartStrategy[1] in production jobs.

We are currently developing the new Flink scheduler[2] which interacts
with restart strategies in a different way. We have to re-design the
interfaces for the new restart strategies (so called
RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
work any more with the new scheduler.

We want to know whether we should keep the way
to customized RestartBackoffTimeStrategy so that existing customized
RestartStrategy can be migrated.

I'd appreciate if you can share the status if you are using customized
RestartStrategy. That will be valuable for use to make decisions.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
[2] https://issues.apache.org/jira/browse/FLINK-10429

Thanks,
Zhu Zhu


?????? blinkSQL????????????????????state??

2019-09-12 Thread ????

  
blinkSQL??watermarkcheckpointflinkSQLblinkSQL??






----
??:"LakeShen"

Re: blinkSQL架构会自动清理过期的state吗

2019-09-12 Thread LakeShen
Hi 守护,
*可*以先确保你的watermark是否一直在更新。
然后可以把窗口时间间隔设置小一点,然后在运行程序,看看checkpoint 的状态文件是否会自动清理。



守护 <346531...@qq.com> 于2019年9月12日周四 下午2:35写道:

> 下面是我要执行的代码,麻烦帮看一下:
>
>
>   tableEnv.registerDataStream("testCountTable",
> waterMarkStream, 'curuserid,'timelong,'rowtime.rowtime)
>  
>   val result = tableEnv.sqlQuery(s"SELECT COUNT(0) as
> pv,COUNT(distinct curuserid)" +
>s" as uv,TUMBLE_END(rowtime, INTERVAL '10' MINUTE)
> FROM testCountTable GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)")
>
>
>   val dsRow: DataStream[Row] =
> tableEnv.toAppendStream[Row](result)
>
>
>   val data=dsRow.map(w = {
>val StrArrary = w.toString.split(",")
>val
> str="{\"pv\":"+"\""+StrArrary(0)+"\""+",\"uv\":"+"\""+StrArrary(1)+"\""+",\"rowtime\":"+"\""+StrArrary(2)+"\""+"}"
>str
>   })
>   data.print()
>
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2019年9月12日(星期四) 中午12:51
> 收件人:"user-zh"
> 主题:Re: blinkSQL架构会自动清理过期的state吗
>
>
>
> Hi,
> 能提供下 SQL 么?
>
> blink sql 的 window 理论上是会自动清理的。
>
>  在 2019年9月11日,18:56,守护 <346531...@qq.com 写道:
> 
>  社区各位大佬:
>  请教一个问题,flink1.9中使用blink
> SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。


Re: suggestion of FLINK-10868

2019-09-12 Thread Peter Huang
Hi Anyang and Till,

I think we agreed on making the interval configurable in this case. Let me
revise the current PR. You can review it after that.



Best Regards
Peter Huang

On Thu, Sep 12, 2019 at 12:53 AM Anyang Hu  wrote:

> Thanks Till, I will continue to follow this issue and see what we can do.
>
> Best regards,
> Anyang
>
> Till Rohrmann  于2019年9月11日周三 下午5:12写道:
>
>> Suggestion 1 makes sense. For the quick termination I think we need to
>> think a bit more about it to find a good solution also to support strict
>> SLA requirements.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 11, 2019 at 11:11 AM Anyang Hu 
>> wrote:
>>
>>> Hi Till,
>>>
>>> Some of our online batch tasks have strict SLA requirements, and they
>>> are not allowed to be stuck for a long time. Therefore, we take a rude way
>>> to make the job exit immediately. The way to wait for connection recovery
>>> is a better solution. Maybe we need to add a timeout to wait for JM to
>>> restore the connection?
>>>
>>> For suggestion 1, make interval configurable, given that we have done
>>> it, and if we can, we hope to give back to the community.
>>>
>>> Best regards,
>>> Anyang
>>>
>>> Till Rohrmann  于2019年9月9日周一 下午3:09写道:
>>>
 Hi Anyang,

 I think we cannot take your proposal because this means that whenever
 we want to call notifyAllocationFailure when there is a connection problem
 between the RM and the JM, then we fail the whole cluster. This is
 something a robust and resilient system should not do because connection
 problems are expected and need to be handled gracefully. Instead if one
 deems the notifyAllocationFailure message to be very important, then one
 would need to keep it and tell the JM once it has connected back.

 Cheers,
 Till

 On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu 
 wrote:

> Hi Peter,
>
> For our online batch task, there is a scene where the failed Container
> reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
> exit (the probability of JM loss is greatly improved when thousands of
> Containers is to be started). It is found that the JM disconnection (the
> reason for JM loss is unknown) will cause the notifyAllocationFailure not
> to take effect.
>
> After the introduction of FLINK-13184
>  to start  the
> container with multi-threaded, the JM disconnection situation has been
> alleviated. In order to stably implement the client immediate exit, we use
> the following code to determine  whether call onFatalError when
> MaximumFailedTaskManagerExceedingException is occurd:
>
> @Override
> public void notifyAllocationFailure(JobID jobId, AllocationID 
> allocationId, Exception cause) {
>validateRunsInMainThread();
>
>JobManagerRegistration jobManagerRegistration = 
> jobManagerRegistrations.get(jobId);
>if (jobManagerRegistration != null) {
>   
> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>  cause);
>} else {
>   if (exitProcessOnJobManagerTimedout) {
>  ResourceManagerException exception = new 
> ResourceManagerException("Job Manager is lost, can not notify allocation 
> failure.");
>  onFatalError(exception);
>   }
>}
> }
>
>
> Best regards,
>
> Anyang
>
>


Re: suggestion of FLINK-10868

2019-09-12 Thread Anyang Hu
Thanks Till, I will continue to follow this issue and see what we can do.

Best regards,
Anyang

Till Rohrmann  于2019年9月11日周三 下午5:12写道:

> Suggestion 1 makes sense. For the quick termination I think we need to
> think a bit more about it to find a good solution also to support strict
> SLA requirements.
>
> Cheers,
> Till
>
> On Wed, Sep 11, 2019 at 11:11 AM Anyang Hu  wrote:
>
>> Hi Till,
>>
>> Some of our online batch tasks have strict SLA requirements, and they are
>> not allowed to be stuck for a long time. Therefore, we take a rude way to
>> make the job exit immediately. The way to wait for connection recovery is a
>> better solution. Maybe we need to add a timeout to wait for JM to restore
>> the connection?
>>
>> For suggestion 1, make interval configurable, given that we have done it,
>> and if we can, we hope to give back to the community.
>>
>> Best regards,
>> Anyang
>>
>> Till Rohrmann  于2019年9月9日周一 下午3:09写道:
>>
>>> Hi Anyang,
>>>
>>> I think we cannot take your proposal because this means that whenever we
>>> want to call notifyAllocationFailure when there is a connection problem
>>> between the RM and the JM, then we fail the whole cluster. This is
>>> something a robust and resilient system should not do because connection
>>> problems are expected and need to be handled gracefully. Instead if one
>>> deems the notifyAllocationFailure message to be very important, then one
>>> would need to keep it and tell the JM once it has connected back.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu 
>>> wrote:
>>>
 Hi Peter,

 For our online batch task, there is a scene where the failed Container
 reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
 exit (the probability of JM loss is greatly improved when thousands of
 Containers is to be started). It is found that the JM disconnection (the
 reason for JM loss is unknown) will cause the notifyAllocationFailure not
 to take effect.

 After the introduction of FLINK-13184
  to start  the
 container with multi-threaded, the JM disconnection situation has been
 alleviated. In order to stably implement the client immediate exit, we use
 the following code to determine  whether call onFatalError when
 MaximumFailedTaskManagerExceedingException is occurd:

 @Override
 public void notifyAllocationFailure(JobID jobId, AllocationID 
 allocationId, Exception cause) {
validateRunsInMainThread();

JobManagerRegistration jobManagerRegistration = 
 jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
   
 jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
  cause);
} else {
   if (exitProcessOnJobManagerTimedout) {
  ResourceManagerException exception = new 
 ResourceManagerException("Job Manager is lost, can not notify allocation 
 failure.");
  onFatalError(exception);
   }
}
 }


 Best regards,

 Anyang




Re: Problem starting taskexecutor daemons in 3 node cluster

2019-09-12 Thread Komal Mariam
I managed to fix it however ran into another problem that I could
appreciate help in resolving.

it turns out that the username for all three nodes was different. having
the same username for them fixed the issue. i.e
same_username@slave-node2-hostname
same_username@slave-node3-hostname
same_username@master-node1-hostname

Infact, because the usernames are the same, I can just save them in the
conf files as:
slave-node2-hostname
slave-node3-hostname
master-node1-hostname

However, for some reason my worker nodes dont show up in the available task
manager in the web UI.

The taskexecutor log says the following:
... (clipped for brevity)
2019-09-12 15:56:36,625 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2019-09-12 15:56:36,631 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
UNIX signal handlers for [TERM, HUP, INT]
2019-09-12 15:56:36,647 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
number of open file descriptors is 1048576.
2019-09-12 15:56:36,710 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, 150.82.218.218
2019-09-12 15:56:36,711 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2019-09-12 15:56:36,712 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2019-09-12 15:56:36,713 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2019-09-12 15:56:36,714 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2019-09-12 15:56:36,715 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2019-09-12 15:56:36,717 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2019-09-12 15:56:37,097 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.
2019-09-12 15:56:37,221 INFO
 org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
create Hadoop Security Module because Hadoop cannot be found in the
Classpath.
2019-09-12 15:56:37,305 INFO
 org.apache.flink.runtime.security.SecurityUtils   - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.
2019-09-12 15:56:38,142 INFO  org.apache.flink.configuration.Configuration
 - Config uses fallback configuration key
'jobmanager.rpc.address' instead of key 'rest.address'
2019-09-12 15:56:38,169 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
select the network interface and address to use by connecting to the
leading JobManager.
2019-09-12 15:56:38,170 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils-
TaskManager will try to connect for 1 milliseconds before falling back
to heuristics
2019-09-12 15:56:38,185 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Retrieved new target address /150.82.218.218:6123.
2019-09-12 15:56:39,691 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Trying to connect to address /150.82.218.218:6123
2019-09-12 15:56:39,693 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address 'salman-hpc/127.0.1.1':
Invalid argument (connect failed)
2019-09-12 15:56:39,696 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/150.82.219.73': No
route to host (Host unreachable)
2019-09-12 15:56:39,698 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address
'/fe80:0:0:0:1e10:83f4:a33a:a208%enp5s0f1': Network is unreachable (connect
failed)
2019-09-12 15:56:39,748 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/150.82.219.73': connect
timed out
2019-09-12 15:56:39,750 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/0:0:0:0:0:0:0:1%lo':
Network is unreachable (connect failed)
2019-09-12 15:56:39,751 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/127.0.0.1': Invalid
argument (connect failed)
2019-09-12 15:56:39,753 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address
'/fe80:0:0:0:1e10:83f4:a33a:a208%enp5s0f1': Network is unreachable (connect
failed)
"flink-komal-taskexecutor-0-salman-hpc.log" 157L, 29954C

I'd 

SIGSEGV error

2019-09-12 Thread Marek Maj
Hi everyone,

Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an upgrade
our task managers started to fail with SIGSEGV error from time to time.

In process of adjusting the code to 1.8.1, we noticed that there were some
changes around TypeSerializerSnapshot interface and its implementations. At
that time we had a few custom serializers which we decided to throw out
during migration and then leverage flink default serializers. We don't mind
clearing the state in the process of migration, an effort to migrate with
state seems to be not worth it.

Unfortunately after running new version we see SIGSEGV errors from time to
time. It may be that serialization is not the real cause, but at the moment
it seems to be the most probable reason. We have not performed any
significant code changes besides serialization area.

We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
Checkpoint configuration: RocksDB backend, not incremental, 50s min
processing time

You can find parts of JobManager log and ErrorFile log of failed container
included below.

Any suggestions are welcome

Best regards
Marek Maj

jobmanager.log

019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   - Completed
checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965 (18532488122 bytes
in 60871 ms).

2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
Triggering checkpoint 48 @ 1568111478177 for job
c8a9ae03785ade86348c3189cf7dd965.

2019-09-10 16:32:19.280 INFO  o.a.f.r.c.CheckpointCoordinator   - Completed
checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965 (19049515705 bytes
in 61083 ms).

2019-09-10 16:33:10.480 INFO  o.a.f.r.c.CheckpointCoordinator   -
Triggering checkpoint 49 @ 1568111589279 for job
c8a9ae03785ade86348c3189cf7dd965.

2019-09-10 16:33:36.773 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
Requesting TaskManager's path for query services failed.

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/dispatcher#374570759]] after
[1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)

at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:816)

at akka.dispatch.OnComplete.internal(Future.scala:258)

at akka.dispatch.OnComplete.internal(Future.scala:256)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)

at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)

at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

at java.lang.Thread.run(Thread.java:745)

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#374570759]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)

... 9 common frames omitted

2019-09-10 16:33:48.782 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
Requesting TaskManager's path for query services failed.

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/dispatcher#374570759]] after
[1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)

at

Re: StreamingFileSink rolling callback Inbox

2019-09-12 Thread Kostas Kloudas
Hi Anton,

First of all, there is this PR
https://github.com/apache/flink/pull/9581 that may be interesting to
you.

Second, I think you have to keep in mind that the hourly bucket
reporting will be per-subtask. So if you have parallelism of 4, each
of the 4 tasks will report individually that they are done with hour
e.g. 10, and it is up to the receiving end to know if it should wait
for more or not. This may be a problem for your stateful assigner
approach as the assigner cannot know by default which subtask it
belongs to. If, for example, you have parallelism of 1, then your
stateful assigner approach could work, although it suffers from the
problem you also mentioned, that it is not integrated with
checkpointing (so a part file may be "reverted") and that a file may
roll, but it does not mean that the previous is already written to the
FS.

Third, a solution could be that instead of having the job itself
pushing notifications that a part file has rolled (which may suffer
from the problem that a part file may roll but the FS takes some time
until it writes everything to disk), you could simply monitor the FS
directory where you are writing your buckets, and parse the part file
names in order to know that all subtasks have finished with hour X.
This can be done by another job which will also put notifications to
the SQS. I think that this will also solve your concern: "I’m also
thinking on how I should couple this with checkpointing mechanism as
ideally I’d like to not invoke this callback before checkpoint is
written."

Cheers,
Kostas

On Mon, Sep 9, 2019 at 12:40 PM Anton Parkhomenko  wrote:
>
> Hello,
>
> I’m writing a Flink job that reads heterogenius (one row contains several 
> types that need to be partitioned downstream) data from AWS Kinesis and 
> writes to S3 directory structure like s3://bucket/year/month/day/hour/type, 
> this all works great with StreamingFileSink in Flink 1.9, but problem is that 
> I need to immedietely (or “as soon as possible” rather) let know another 
> application to know when “hour” bucket has rolled (i.e. we’re 100% sure it 
> won’t write any more data for this hour). Another problem is that data can be 
> very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% 
> of rows with typeB and 1% of rows with typeC.
>
> My current plan is to:
>
> 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t 
> care about event time at all)
> 2. Assign every row its bucket in a windowing function
> 3. Write a stateful BucketAssigner that:
> 3.1. Keeps its last window in a mutable variable
> 3.2. Once we received a row with newer window sends a message to SQS and 
> increments the window
>
> My biggest concern now is about 3rd point. For me BucketAssigner looks like a 
> pure function of (Row, Time) -> Bucket and I’m not sure that introducing 
> state and side-effect there would be reasonable. Is there any other ways to 
> do it? I’m also thinking on how I should couple this with checkpointing 
> mechanism as ideally I’d like to not invoke this callback before checkpoint 
> is written.
>
> StreamingFileSink provides not much ways to extend it. I tried to 
> re-implement it for my purposes, but stumbled upon many private methods and 
> classes, so even though it looks possible, the end result probably will be 
> too ugly.
>
> To make things a little bit easier, I don’t care too much about delivery 
> semantics of those final SQS messages - if I get only ~99% of them - that’s 
> fine, if some of them will be duplicated - that’s also fine.
>
> Regards,
> Anton