Re: Passing vm options
Got it. Thanks On Mon, Jan 7, 2019 at 5:32 PM Dominik Wosiński wrote: > Hey, > AFAIK, Flink supports dynamic properties currently only on YARN and not > really in standalone mode. > If You are using YARN it should indeed be possible to set such > configuration. If not, then I am afraid it is not possible. > > Best Regards, > Dom. > > > pon., 7 sty 2019 o 09:01 Avi Levi napisał(a): > >> Hi , >> I am trying to pass some vm options e.g >> bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar >> -Dflink.stateDir=file:///tmp/ -Dkafka.bootstrap.servers="localhost:9092" >> -Dkafka.security.ssl.enabled=false >> but it doesn't seem to override the values in application.conf . Am I >> missing something? >> BTW is it possible to pass config file using -Dcofig.file ? >> >> BR >> Avi >> >
Re: Counter Metrics not getting removed from Flink GUI after close()
Hi Chesnay, I do not want to store metric counter in reference variable because I want to create metric counter for every key of keyed stream. There can be n number of keys and I do not want to have n number of references. On Tue, 8 Jan, 2019, 11:01 PM Chesnay Schepler What you're trying to do is not possible. Even if you close the group *it > still exists*, and is returned by subsequent calls to addGroup("mygroup"). > However since it is closed all registration calls will be ignored, hence > why the value isn't updating. > > You can only update a metric by storing a reference to it in your function. > Why do you want to avoid the member variable? > > On 08.01.2019 17:24, Gaurav Luthra wrote: > > Hi Chesnay, > > If removing the metrics is not possible from Flink GUI, while the job is > running. > Then kindly tell me how to update a metric counter. > > Explaination: > Suppose I created a metric Counter with key "chesnay" and incremented the > counter to 20, by code mentioned below. > getRuntimeContext().getMetricGroup().counter("chesnay").inc(20); > > /Note: I am not assigning this counter to any local/member variable as I do > not want to keep state in my job./ > > Now, after some time, If I want to update the value of "chesnay" metric > counter to 60 and I am not aware about the old state (which is 20). > > So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60); > > Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a > WARN log something like this, "same name can be used, and behavior is > undefined". > > Now, how to update the "chesnay" metric Group if I do not want to keep the > state in my Job??? > > Thats why, I though of creating user scoped metric group and thought of > closing that group to remove the metric counters and create new metrics > every time, when I want to update it. > > Hope you understood my problem. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > >
Re: The way to write a UDF with generic type
Get it, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Subtask much slower than the others when creating checkpoints
Hi Bruno, there are multiple reasons why one of the subtasks can take longer for checkpointing. It looks as if there is not much data skew since the state sizes are relatively equal. It also looks as if the individual tasks all start at the same time with the checkpointing which indicates that there mustn't be a lot of back-pressure in the DAG (or all tasks were equally back-pressured). This narrows the problem cause down to the asynchronous write operation. One potential problem could be if the external system to which you write your checkpoint data has some kind of I/O limit/quota. Maybe the sum of write accesses deplete the maximum quota you have. You could try whether running the job with a lower parallelism solves the problems. For further debugging it could be helpful to get access to the logs of the JobManager and the TaskManagers on DEBUG log level. It could also be helpful to learn which state backend you are using. Cheers, Till On Tue, Jan 8, 2019 at 12:52 PM Bruno Aranda wrote: > Hi, > > We are using Flink 1.6.1 at the moment and we have a streaming job > configured to create a checkpoint every 10 seconds. Looking at the > checkpointing times in the UI, we can see that one subtask is much slower > creating the endpoint, at least in its "End to End Duration", and seems > caused by a longer "Checkpoint Duration (Async)". > > For instance, in the attach screenshot, while most of the subtasks take > half a second, one (and it is always one) takes 2 seconds. > > But we have worse problems. We have seen cases where the checkpoint times > out for one tasks, while most take one second, the outlier takes more than > 5 minutes (which is the max time we allow for a checkpoint). This can > happen if there is back pressure. We only allow one checkpoint at a time as > well. > > Why could one subtask take more time? This jobs read from kafka partitions > and hash by key, and we don't see any major data skew between the > partitions. Does one partition do more work? > > We do have a cluster of 20 machines, in EMR, with TMs that have multiple > slots (in legacy mode). > > Is this something that could have been fixed in a more recent version? > > Thanks for any insight! > > Bruno > > >
Re: Counter Metrics not getting removed from Flink GUI after close()
What you're trying to do is not possible. Even if you close the group /it still exists/, and is returned by subsequent calls to addGroup("mygroup"). However since it is closed all registration calls will be ignored, hence why the value isn't updating. You can only update a metric by storing a reference to it in your function. Why do you want to avoid the member variable? On 08.01.2019 17:24, Gaurav Luthra wrote: Hi Chesnay, If removing the metrics is not possible from Flink GUI, while the job is running. Then kindly tell me how to update a metric counter. Explaination: Suppose I created a metric Counter with key "chesnay" and incremented the counter to 20, by code mentioned below. getRuntimeContext().getMetricGroup().counter("chesnay").inc(20); /Note: I am not assigning this counter to any local/member variable as I do not want to keep state in my job./ Now, after some time, If I want to update the value of "chesnay" metric counter to 60 and I am not aware about the old state (which is 20). So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60); Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a WARN log something like this, "same name can be used, and behavior is undefined". Now, how to update the "chesnay" metric Group if I do not want to keep the state in my Job??? Thats why, I though of creating user scoped metric group and thought of closing that group to remove the metric counters and create new metrics every time, when I want to update it. Hope you understood my problem. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Counter Metrics not getting removed from Flink GUI after close()
Hi Chesnay, If removing the metrics is not possible from Flink GUI, while the job is running. Then kindly tell me how to update a metric counter. Explaination: Suppose I created a metric Counter with key "chesnay" and incremented the counter to 20, by code mentioned below. getRuntimeContext().getMetricGroup().counter("chesnay").inc(20); /Note: I am not assigning this counter to any local/member variable as I do not want to keep state in my job./ Now, after some time, If I want to update the value of "chesnay" metric counter to 60 and I am not aware about the old state (which is 20). So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60); Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a WARN log something like this, "same name can be used, and behavior is undefined". Now, how to update the "chesnay" metric Group if I do not want to keep the state in my Job??? Thats why, I though of creating user scoped metric group and thought of closing that group to remove the metric counters and create new metrics every time, when I want to update it. Hope you understood my problem. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Counter Metrics not getting removed from Flink GUI after close()
Metrics for a given job will be available in the GUI until the Job has finished. On 08.01.2019 17:08, Gaurav Luthra wrote: Hi, I am using ProcessWindowFunction, and in process() function I am adding user scoped Group as mentioned below. MetricGroup myMetricGroup= getRuntimeContext().getMetricGroup().addGroup("myGroup") Now, I am creating counter metrics using my myMetricGroup, and I am able to see these counters in Flink GUI. But when I call close() like mentioned below. ((AbstractMetricGroup) myMetricGroup).close(); Even then my counter metrics are not getting removed from flink GUI. Kindly Guide how to close user scoped metric group (myMetricGroup in my case) so that all the counter metrics created using myMetricGroup shall be removed from Flink GUI. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
Counter Metrics not getting removed from Flink GUI after close()
Hi, I am using ProcessWindowFunction, and in process() function I am adding user scoped Group as mentioned below. MetricGroup myMetricGroup = getRuntimeContext().getMetricGroup().addGroup( "myGroup") Now, I am creating counter metrics using my myMetricGroup, and I am able to see these counters in Flink GUI. But when I call close() like mentioned below. ((AbstractMetricGroup) myMetricGroup).close(); Even then my counter metrics are not getting removed from flink GUI. Kindly Guide how to close user scoped metric group (myMetricGroup in my case) so that all the counter metrics created using myMetricGroup shall be removed from Flink GUI. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
Re: onTimer function is not getting executed and job is marked as finished.
Sure, I will do that. On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng wrote: > Hi Puneet, > > Can you explain it in more detail? Do you mean the job is finished before > you call ctx.timeservice()? > Maybe you have to let your source running for a longer time. > > It's better to show us the whole pipeline of your job. For example, write > a sample code(or provide a git link) that can reproduce your problem easily. > > Best, Hequn > > > On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra < > puneet.ki...@customercentria.com> wrote: > >> Hi hequan >> >> Weird behaviour when i m calling ctx.timeservice() function is getting >> exited even not throwing error >> >> On Tuesday, January 8, 2019, Hequn Cheng wrote: >> >>> Hi puneet, >>> >>> Could you print `parseLong + 5000` and >>> `ctx.timerService().currentProcessingTime()` out and check the value? >>> I know it is a streaming program. What I mean is the timer you have >>> registered is not within the interval of your job, so the timer has not >>> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = >>> 1000(very big). >>> >>> Best, Hequn >>> >>> >>> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra < >>> puneet.ki...@customercentria.com> wrote: >>> I checked the same the function is getting exited when i am calling ctx.getTimeservice () function. On Mon, Jan 7, 2019 at 10:27 PM Timo Walther wrote: > Hi Puneet, > > maybe you can show or explain us a bit more about your pipeline. From > what I see your ProcessFunction looks correct. Are you sure the > registering > takes place? > > Regards, > Timo > > Am 07.01.19 um 14:15 schrieb Puneet Kinra: > > Hi Hequn > > Its a streaming job . > > On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng > wrote: > >> Hi Puneet, >> >> The value of the registered timer should within startTime and endTime >> of your job. For example, job starts at processing time t1 and stops at >> processing time t2. You have to make sure t1< `parseLong + 5000` < t2. >> >> Best, Hequn >> >> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra < >> puneet.ki...@customercentria.com> wrote: >> >>> Hi All >>> >>> Facing some issue with context to onTimer method in processfunction >>> >>> class TimerTest extends >>> ProcessFunction,String>{ >>> >>> /** >>> * >>> */ >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public void processElement(Tuple2 arg0, >>> ProcessFunction, String>.Context ctx, >>> Collector arg2) throws Exception { >>> // TODO Auto-generated method stub >>> long parseLong = Long.parseLong(arg0.f1); >>> TimerService timerService = ctx.timerService(); >>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000); >>> } >>> >>> @Override >>> public void onTimer(long timestamp, ProcessFunction>> String>, String>.OnTimerContext ctx, >>> Collector out) throws Exception { >>> // TODO Auto-generated method stub >>> super.onTimer(timestamp, ctx, out); >>> System.out.println("Executing timmer"+timestamp); >>> out.collect("Timer Testing.."); >>> } >>> } >>> >>> -- >>> *Cheers * >>> >>> *Puneet Kinra* >>> >>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>> * >>> >>> *e-mail :puneet.ki...@customercentria.com >>> * >>> >>> >>> > > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > * > > *e-mail :puneet.ki...@customercentria.com > * > > > > -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com * *e-mail :puneet.ki...@customercentria.com * >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> * >> >> *e-mail :puneet.ki...@customercentria.com >> * >> >> >> >> -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com * *e-mail :puneet.ki...@customercentria.com *
Re: ConnectTimeoutException when createPartitionRequestClient
Hi Wenrui, the exception now occurs while finishing the connection creation. I'm not sure whether this is so different. Could it be that your network is overloaded or not very reliable? Have you tried running your Flink job outside of AthenaX? Cheers, Till On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng wrote: > Hi Till, > > Thanks for your reply. Our cluster is Yarn cluster. I found that if we > decrease the total parallel the timeout issue can be avoided. But we do > need that amount of taskManagers to process data. In addition, once I > increase the netty server threads to 128, the error is changed to to > following error. It seems the cause is different. Could you help take a > look? > > 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED. > java.io.IOException: Connecting the channel failed: Connecting to remote > task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might > indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84) > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' > has failed. This might indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 common frames omitted > Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/ > 10.70.129.13:39466 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.flink.shaded.netty4.io
Re: onTimer function is not getting executed and job is marked as finished.
Hi Puneet, Can you explain it in more detail? Do you mean the job is finished before you call ctx.timeservice()? Maybe you have to let your source running for a longer time. It's better to show us the whole pipeline of your job. For example, write a sample code(or provide a git link) that can reproduce your problem easily. Best, Hequn On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra < puneet.ki...@customercentria.com> wrote: > Hi hequan > > Weird behaviour when i m calling ctx.timeservice() function is getting > exited even not throwing error > > On Tuesday, January 8, 2019, Hequn Cheng wrote: > >> Hi puneet, >> >> Could you print `parseLong + 5000` and >> `ctx.timerService().currentProcessingTime()` out and check the value? >> I know it is a streaming program. What I mean is the timer you have >> registered is not within the interval of your job, so the timer has not >> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = >> 1000(very big). >> >> Best, Hequn >> >> >> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra < >> puneet.ki...@customercentria.com> wrote: >> >>> I checked the same the function is getting exited when i am calling >>> ctx.getTimeservice () function. >>> >>> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther wrote: >>> Hi Puneet, maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place? Regards, Timo Am 07.01.19 um 14:15 schrieb Puneet Kinra: Hi Hequn Its a streaming job . On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng wrote: > Hi Puneet, > > The value of the registered timer should within startTime and endTime > of your job. For example, job starts at processing time t1 and stops at > processing time t2. You have to make sure t1< `parseLong + 5000` < t2. > > Best, Hequn > > On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra < > puneet.ki...@customercentria.com> wrote: > >> Hi All >> >> Facing some issue with context to onTimer method in processfunction >> >> class TimerTest extends ProcessFunction,String>{ >> >> /** >> * >> */ >> private static final long serialVersionUID = 1L; >> >> @Override >> public void processElement(Tuple2 arg0, >> ProcessFunction, String>.Context ctx, >> Collector arg2) throws Exception { >> // TODO Auto-generated method stub >> long parseLong = Long.parseLong(arg0.f1); >> TimerService timerService = ctx.timerService(); >> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000); >> } >> >> @Override >> public void onTimer(long timestamp, ProcessFunction> String>, String>.OnTimerContext ctx, >> Collector out) throws Exception { >> // TODO Auto-generated method stub >> super.onTimer(timestamp, ctx, out); >> System.out.println("Executing timmer"+timestamp); >> out.collect("Timer Testing.."); >> } >> } >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> * >> >> *e-mail :puneet.ki...@customercentria.com >> * >> >> >> -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com * *e-mail :puneet.ki...@customercentria.com * >>> >>> -- >>> *Cheers * >>> >>> *Puneet Kinra* >>> >>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>> * >>> >>> *e-mail :puneet.ki...@customercentria.com >>> * >>> >>> >>> > > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > * > > *e-mail :puneet.ki...@customercentria.com > * > > > >
Re: How to get the temp result of each dynamic table when executing Flink-SQL?
Hi, A print user-defined table sink is helpful. I think a print user-defined UDF is another workaround. Hope this helps. Best, Hequn On Tue, Jan 8, 2019 at 1:45 PM yinhua.dai wrote: > In our case, we wrote a console table sink which print everything on the > console, and use "insert into" to write the interim result to console. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: ConnectTimeoutException when createPartitionRequestClient
Hi Till, Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease the total parallel the timeout issue can be avoided. But we do need that amount of taskManagers to process data. In addition, once I increase the netty server threads to 128, the error is changed to to following error. It seems the cause is different. Could you help take a look? 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED. java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 common frames omitted Caused by: java.net.ConnectException: Connection timed out: athena464-sjc1/ 10.70.129.13:39466 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281) ... 6 common frames omitted Thanks, Wenrui On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann wrote: > Hi Wenrui, > > the code to set the connect timeout looks ok to me [1]. I also tested it > locally and checked that the timeou
Subtask much slower than the others when creating checkpoints
Hi, We are using Flink 1.6.1 at the moment and we have a streaming job configured to create a checkpoint every 10 seconds. Looking at the checkpointing times in the UI, we can see that one subtask is much slower creating the endpoint, at least in its "End to End Duration", and seems caused by a longer "Checkpoint Duration (Async)". For instance, in the attach screenshot, while most of the subtasks take half a second, one (and it is always one) takes 2 seconds. But we have worse problems. We have seen cases where the checkpoint times out for one tasks, while most take one second, the outlier takes more than 5 minutes (which is the max time we allow for a checkpoint). This can happen if there is back pressure. We only allow one checkpoint at a time as well. Why could one subtask take more time? This jobs read from kafka partitions and hash by key, and we don't see any major data skew between the partitions. Does one partition do more work? We do have a cluster of 20 machines, in EMR, with TMs that have multiple slots (in legacy mode). Is this something that could have been fixed in a more recent version? Thanks for any insight! Bruno
Re: The way to write a UDF with generic type
Currently, this functionality is hard-coded in the aggregation translation. Namely in `org.apache.flink.table.runtime.aggregate.AggregateUtil#transformToAggregateFunctions` [1]. Timo [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala Am 08.01.19 um 06:41 schrieb yinhua.dai: Hi Timo, Can you let me know how the build-in "MAX" function able to support different types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Buffer stats when Back Pressure is high
Thanks Timo for suggested solution. Will go with idea of artificial key for our use case. Gagan On Mon, Jan 7, 2019 at 10:21 PM Timo Walther wrote: > Hi Gagan, > > a typical solution to such a problem is to introduce an artifical key > (enrichment id + some additional suffix), you can then keyBy on this > artificial key and thus spread the workload more evenly. Of course you need > to make sure that records of the second stream are duplicated to all > operators with the same artificial key. > > Depending on the frequency of the second stream, it might also worth to > use a broadcast join that distributes the second stream to all operators > such that all operators can perform the enrichment step in a round robin > fashion. > > Regards, > Timo > > Am 07.01.19 um 14:45 schrieb Gagan Agrawal: > > Flink Version is 1.7. > Thanks Zhijiang for your pointer. Initially I was checking only for few. > However I just checked for all and found couple of them having queue length > of 40+ which seems to be due to skewness in data. Is there any general > guide lines on how to handle skewed data? In my case I am taking union and > then keyBy (with custom stateful Process function) on enrichment id of 2 > streams (1 enrichment stream with low volume and another regular data > stream with high volume). I see that 30% of my data stream records have > same enrichment Id and hence go to same tasks which results in skewness. > Any pointers on how to handle skewness while doing keyBy would be of great > help. > > Gagan > > On Mon, Jan 7, 2019 at 3:25 PM zhijiang > wrote: > >> Hi Gagan, >> >> What flink version do you use? And have you checked the >> buffers.inputQueueLength >> for all the related parallelism (connected with A) of B? It may exist the >> scenario that only one parallelim B is full of inqueue buffers which back >> pressure A, and the input queue for other parallelism B is empty. >> >> Best, >> Zhijiang >> >> -- >> From:Gagan Agrawal >> Send Time:2019年1月7日(星期一) 12:06 >> To:user >> Subject:Buffer stats when Back Pressure is high >> >> Hi, >> I want to understand does any of buffer stats help in debugging / >> validating that downstream operator is performing slow when Back Pressure >> is high? Say I have A -> B operators and A shows High Back Pressure which >> indicates something wrong or not performing well on B side which is slowing >> down operator A. However when I look at buffers.inputQueueLength for >> operator B, it's 0. My understanding is that when B is processing slow, >> it's input buffer will be full of incoming messages which ultimately >> blocks/slows down upstream operator A. However it doesn't seem to be >> happening in my case. Can someone throw some light on how should different >> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, >> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when >> downstream operator is performing slow? >> >> Gagan >> >> >> >