RE: CEP Debugging

2018-04-20 Thread Nortman, Bill
So it appears the issue was that I had setSteamTimeCharacteristic of EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

removing this or setting it to
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Then allowed the CEP Pattern code to execute resulting in getting the log 
messages I was expecting, thus actually processing.


From: Nortman, Bill [mailto:william.nort...@pimco.com]
Sent: Thursday, April 19, 2018 2:51 PM
To: 'user@flink.apache.org'
Subject: CEP Debugging

So I have events coming in in this format
TransactionID, MicroserviceName, Direction,EpochTimeStamp.
For each call to a microservice and event is generates with a timestamp with 
direction of "in". Then when completed it generates with a timestamp with 
direction of "out".
I need to calculate latency for both the microservices per transactions and for 
the entire transaction.
Example Data (TransactionID, MicroserviceName, Direction,EpochTimeStamp)
1,A,in,1700
1,B,in,1702
1,B,out,1704
1,C,in,1704
1,D,in,1705
1,D,out,1706
1,C,out,1709
1,A,out,1710
Look for a pattern to handle this.
Something like this
Pattern pattern = 
Pattern.begin("myBegin").where(
new SimpleCondition() {
public boolean filter(MetricsEvent metricsEvent) throws 
Exception {
return metricsEvent.getDirection().equals("in");
}
}
).followedBy("myNext").where(
new SimpleCondition() {
public boolean filter(MetricsEvent metricsEvent) throws 
Exception {
metricsEvent.getApplication().equals(//PREVIOUS 
EVENT.getApplication()))
return false;
}
}
)
I'm note sure how to get the previous event to compare too.
Then how to calculate the latency between the two events?
So I tried this
Pattern pattern = 
Pattern.begin("myBegin").where(
new SimpleCondition() {
public boolean filter(final MetricsEvent metricsEvent)  {
System.out.println("Begin Filter");
return !metricsEvent.getEventType().equals("in");
}
}
).followedBy("followed").where(

new SimpleCondition() {
public boolean filter(final MetricsEvent metricsEvent) {
System.out.println("FollowedByFilter");
return !metricsEvent.getEventType().equals("out");
}
}
);
// Define a stream processor using the pattern
PatternStream patternStream = CEP.pattern(
metricEventStream,
pattern);

// Process the stream
System.out.println("* Pattern Processing");
SingleOutputStreamOperator latencyEvents =
patternStream.flatSelect(new 
ApplicationLatencyFlatSelectFunction());
With this as the latency calc
public class ApplicationLatencyFlatSelectFunction implements
org.apache.flink.cep.PatternFlatSelectFunction 
{
@SuppressWarnings("CheckStyle") // Base class method doesn't have JavaDoc
public void flatSelect(final Map map, final 
Collector collector) {
System.out.println("  Flat Select From Pattern");
MetricsEvent begin = map.get("myBegin").get(0);
List ends = map.get("followed");
for (MetricsEvent me: ends
) {
if (me.getApplication().equals(begin.getApplication())) {
Long latency = me.getEpochTime() - begin.getEpochTime();
collector.collect(new MetricLatency(begin.getUid(), 
begin.getApplication(), latency));
}
}

}
}
However I don't get any output, the printlines in the pattern or the flatselect 
function never print.
How do you debug something like this?


This message contains confidential information and is intended only for the 
individual named. If you are not the named addressee, you should not 
disseminate, distribute, alter or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. E-mail transmissions cannot be guaranteed to be 
secure or without error as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. The sender, 
therefore, does not accept liability for any errors or omissions in the 
contents of this message which arise during or as a result of e-mail 
transmission. If verification is required, please request a hard-copy version. 
This message is provided for information purposes and should not be construed 
as a solicitation or offer to buy or sell any securities or related financial 
instruments in any jurisdiction.  Securities are offered in the U.S. through 
PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee 

Re: managin order to use epoll (tasker.network.netty.transport: epoll), is it required that linux version is 4.0.16 or newer or not

2018-04-20 Thread Ted Yu
I think you should upgrade Linux to said version or newer.

Cheers

On Fri, Apr 20, 2018 at 6:35 AM, makeyang  wrote:

> my flink veriso is 1.4.2
> my jdk version is 1.8.0.20
> my linux version is:3.10.0
>
> I try to use epoll with setting: tasker.network.netty.transport: epoll
> but it throws excption which leads me here:
> https://github.com/apache/flink-shaded/issues/30
> I followed the instruction and still exception throws.
> then I found below link Native transports whic is offical netty doc below:
> http://netty.io/wiki/native-transports.html
> and it says:
> Netty provides the following platform specific JNI transports:
> Linux (since 4.0.16)
>
> so my question is:
> in order to use this epoll,  is it required that linux version is 4.0.16 or
> newer or not?
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


managin order to use epoll (tasker.network.netty.transport: epoll), is it required that linux version is 4.0.16 or newer or not

2018-04-20 Thread makeyang
my flink veriso is 1.4.2
my jdk version is 1.8.0.20
my linux version is:3.10.0

I try to use epoll with setting: tasker.network.netty.transport: epoll
but it throws excption which leads me here:
https://github.com/apache/flink-shaded/issues/30
I followed the instruction and still exception throws. 
then I found below link Native transports whic is offical netty doc below:
http://netty.io/wiki/native-transports.html
and it says:
Netty provides the following platform specific JNI transports:
Linux (since 4.0.16)

so my question is:
in order to use this epoll,  is it required that linux version is 4.0.16 or
newer or not?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 1.4.3 release/roadmap

2018-04-20 Thread Fabian Hueske
Hi Daniel,

The discussion for releasing Flink 1.4.3 hasn't been started (until now).
The community is still working on the 1.5.0 release but AFAIK, there are no
blockers for 1.4.3.

Development and release discussions take place on the dev@f.a.o list.
Would you kicking off a discussion there?

Thanks, Fabian

2018-04-20 6:36 GMT+02:00 Bowen Li :

> ​to find bug fixes that are going into​ 1.4.x, say 1.4.3, you can filter
> jira tickets with 'Fix Versions' as '1.4.3'
>
> On Thu, Apr 19, 2018 at 1:36 AM, Daniel Harper 
> wrote:
>
>> Hi there,
>>
>> There are some bug fixes that are in the 1.4 branch that we would like to
>> be made available for us to use.
>>
>> Is there a roadmap from the project when the next stable 1.4.x release
>> will be cut? Any blockers?
>>
>
>


Re: Substasks - Uneven allocation

2018-04-20 Thread PedroMrChaves
That is only used to split the load across all of the subtasks, which am
already doing.
It is not related with the allocation.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: debug for Flink

2018-04-20 Thread Kien Truong

Hi,

We put a lot of logging in our code the flow of events as they flow 
between operators,


this is done at the DEBUG level.

Normally this logging would be turn off by setting the logging level to 
INFO or above,


because logging everything is expensive and a cluttered log is harder to 
analyze.


When we need to switch the logging to DEBUG level,

we currently change the configuration files, then restart the job,

so that Flink would pick up and use the updated files during redeployment.

The other option is to manually track down the location of each YARN 
container,


and update the configuration files directly,  but that's a lot of work,

and not possible if you don't have superuser right on that machine, due 
to our security setup.


That's why we want an easier way to change the logging level.


Best regards,

Kien


On 4/20/2018 12:15 AM, Qian Ye wrote:

Thanks for your kind reply.
But I still have some question. What does the logging level mean in your 
system? Why do you need to re-deploy the cluster to change the logging level?  
As far as I know, the debugging information can be divided into level like 
info, warn, error, etc. Is these information totally logged during the system 
running? And then they can further be analyzed by category them by level?

Best,
Stephen


On Apr 19, 2018, at 7:19 AM, Kien Truong  wrote:

Hi,

Our most useful tool when debugging Flink is actually the simple log files, 
because debugger just slow things down too much for us.

However, having to re-deploy the entire cluster to change the logging level is 
a pain (we use YARN),

so we would really like an easier method to change the logging level at runtime.


Regards,

Kien


On 4/19/2018 5:53 AM, Qian Ye wrote:

Hi

I’m wondering if new debugging methods/tools  are urgent for Flink development. 
I know there already exists some debug methods for Flink, e.g., remote 
debugging of flink 
clusters(https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters).
 But are they are convenient enough?

Best regards.


Run programs w/ params including comma via REST api

2018-04-20 Thread Dongwon Kim
Hi,

I'm trying to run a program by sending POST requests.
I've already spawned dispatcher in YARN and uploaded a jar file to the 
dispatcher.

I want to execute my application using the following arguments (--topic and 
--kafkaParams):
--topic gps-topic --kafkaParams 
bootstrap.servers=dacoe2:20245,group.id=trajectory-tracker
As you can see, there's a comma in the value of kafkaParams.

When I'm sending my application using the following command from Bash (%20 is a 
space and $2C is a comma), 
bash> curl -X POST 
'http://dacoe4.weave.local:45097/jars/7b6880d7-b899-4243-8b9b-a01ad7f8a854_Tmap-1.0-SNAPSHOT.jar/run?entry-class=com.skt.tmap.trajectorytracker.TrajectoryTracker=--topic%20gps-topic%20--kafkaParams%20bootstrap.servers=dacoe2:20245%2Cgroup.id=trajectory-tracker'

I get the following response from the dispatcher:
{
"errors": [
"Expected only one value [--topic gps-topic --kafkaParams 
bootstrap.servers=dacoe2:20245, group.id=trajectory-tracker]."
]
}

What I found from the source code is that 
org.apache.flink.runtime.rest.messages.MessageQueue tries split the value of 
program-ages using comma as a delimiter.

I think I could modify my program to get arguments in a different way but it is 
not going to be intuitive to use different characters for a delimiter instead 
of comma.

How you guys think?

Or there's a way to avoid this behavior of splitting the value of program-args 
into multiple pieces?

best,

- Dongwon

Re: Testing on Flink 1.5

2018-04-20 Thread Amit Jain
Hi Gary,

This setting has resolved the issue. Does it increase timeout for all the
RPC or specific components?

We had following settings in Flink 1.3.2 and they did the job for us.

akka.watch.heartbeat.pause: 600 s
akka.client.timeout: 5 min
akka.ask.timeout: 120 s


--
Thanks,
Amit


Re: Flink State monitoring

2018-04-20 Thread Stefan Richter
If estimates are good enough, we should be able to expose something. Would 
still like to double check the guarantees to see if the estimates of RocksDB 
are helpful or could be misleading.

> Am 20.04.2018 um 11:59 schrieb Juho Autio :
> 
> Thanks. At least for us it doesn't matter how exact the number is. I would 
> expect most users to be only interested in monitoring if the total state size 
> keeps growing (rapidly), or remains about the same. I suppose all of the 
> options that you suggested would satisfy this need?
> 
> On Fri, Apr 20, 2018 at 12:53 PM, Stefan Richter  > wrote:
> Hi,
> 
> for incremental checkpoints, it is only showing the size of the deltas. It 
> would probably also be possible to report the full size, but the current 
> reporting and UI is only supporting to deliver a single value. In general, 
> some things are rather hard to report. For example, for the heap based 
> backend, is the state size the size of the serialized data or the size of the 
> heap objects? 
> Another remark about key count: the key count is easy to determine for the 
> heap based backend, but there is no (efficient) method in RocksDb that gives 
> the key count (because of the way RocksDB works). In this case, afaik, we 
> have the (inefficient) option to iterate all keys and count or use the 
> (efficient) estimated key count is supported by RocksDB.
> 
> Best,
> Stefan
> 
> 
>> Am 04.01.2018 um 19:23 schrieb Steven Wu > >:
>> 
>> Aljoscha/Stefan, 
>> 
>> if incremental checkpoint is enabled, I assume the "checkpoint size" is only 
>> the delta/incremental size (not the full state size), right?
>> 
>> Thanks,
>> Steven
>> 
>> 
>> On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek > > wrote:
>> Hi,
>> 
>> I'm afraid there is currently no metrics around state. I see that it's very 
>> good to have so I'm putting it on my list of stuff that we should have at 
>> some point.
>> 
>> One thing that comes to mind is checking the size of checkpoints, which 
>> gives you an indirect way of figuring out how big state is but that's not 
>> very exact, i.e. doesn't give you "number of keys" or some such.
>> 
>> Best,
>> Aljoscha
>> 
>> > On 20. Dec 2017, at 08:09, Netzer, Liron > > > wrote:
>> >
>> > Ufuk, Thanks for replying !
>> >
>> > Aljoscha, can you please assist with the questions below?
>> >
>> > Thanks,
>> > Liron
>> >
>> > -Original Message-
>> > From: Ufuk Celebi [mailto:u...@apache.org ]
>> > Sent: Friday, December 15, 2017 3:06 PM
>> > To: Netzer, Liron [ICG-IT]
>> > Cc: user@flink.apache.org 
>> > Subject: Re: Flink State monitoring
>> >
>> > Hey Liron,
>> >
>> > unfortunately, there are no built-in metrics related to state. In general, 
>> > exposing the actual values as metrics is problematic, but exposing summary 
>> > statistics would be a good idea. I'm not aware of a good work around at 
>> > the moment that would work in the general case (taking into account state 
>> > restore, etc.).
>> >
>> > Let me pull in Aljoscha (cc'd) who knows the state backend internals well.
>> >
>> > @Aljoscha:
>> > 1) Are there any plans to expose keyed state related metrics (like number 
>> > of keys)?
>> > 2) Is there a way to work around the lack of these metrics in 1.3?
>> >
>> > – Ufuk
>> >
>> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron > > > wrote:
>> >> Hi group,
>> >>
>> >>
>> >>
>> >> We are using Flink keyed state in several operators.
>> >>
>> >> Is there an easy was to expose the data that is stored in the state, i.e.
>> >> the key and the values?
>> >>
>> >> This is needed for both monitoring as well as debugging. We would like
>> >> to understand how many key+values are stored in each state and also to
>> >> view the data itself.
>> >>
>> >> I know that there is the "Queryable state" option, but this is still
>> >> in Beta, and doesn't really give us what we want easily.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *We are using Flink 1.3.2 with Java.
>> >>
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Liron
>> 
>> 
> 
> 



Re: Flink State monitoring

2018-04-20 Thread Juho Autio
Thanks. At least for us it doesn't matter how exact the number is. I would
expect most users to be only interested in monitoring if the total state
size keeps growing (rapidly), or remains about the same. I suppose all of
the options that you suggested would satisfy this need?

On Fri, Apr 20, 2018 at 12:53 PM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> Hi,
>
> for incremental checkpoints, it is only showing the size of the deltas. It
> would probably also be possible to report the full size, but the current
> reporting and UI is only supporting to deliver a single value. In general,
> some things are rather hard to report. For example, for the heap based
> backend, is the state size the size of the serialized data or the size of
> the heap objects?
> Another remark about key count: the key count is easy to determine for the
> heap based backend, but there is no (efficient) method in RocksDb that
> gives the key count (because of the way RocksDB works). In this case,
> afaik, we have the (inefficient) option to iterate all keys and count or
> use the (efficient) estimated key count is supported by RocksDB.
>
> Best,
> Stefan
>
>
> Am 04.01.2018 um 19:23 schrieb Steven Wu :
>
> Aljoscha/Stefan,
>
> if incremental checkpoint is enabled, I assume the "checkpoint size" is
> only the delta/incremental size (not the full state size), right?
>
> Thanks,
> Steven
>
>
> On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I'm afraid there is currently no metrics around state. I see that it's
>> very good to have so I'm putting it on my list of stuff that we should have
>> at some point.
>>
>> One thing that comes to mind is checking the size of checkpoints, which
>> gives you an indirect way of figuring out how big state is but that's not
>> very exact, i.e. doesn't give you "number of keys" or some such.
>>
>> Best,
>> Aljoscha
>>
>> > On 20. Dec 2017, at 08:09, Netzer, Liron  wrote:
>> >
>> > Ufuk, Thanks for replying !
>> >
>> > Aljoscha, can you please assist with the questions below?
>> >
>> > Thanks,
>> > Liron
>> >
>> > -Original Message-
>> > From: Ufuk Celebi [mailto:u...@apache.org]
>> > Sent: Friday, December 15, 2017 3:06 PM
>> > To: Netzer, Liron [ICG-IT]
>> > Cc: user@flink.apache.org
>> > Subject: Re: Flink State monitoring
>> >
>> > Hey Liron,
>> >
>> > unfortunately, there are no built-in metrics related to state. In
>> general, exposing the actual values as metrics is problematic, but exposing
>> summary statistics would be a good idea. I'm not aware of a good work
>> around at the moment that would work in the general case (taking into
>> account state restore, etc.).
>> >
>> > Let me pull in Aljoscha (cc'd) who knows the state backend internals
>> well.
>> >
>> > @Aljoscha:
>> > 1) Are there any plans to expose keyed state related metrics (like
>> number of keys)?
>> > 2) Is there a way to work around the lack of these metrics in 1.3?
>> >
>> > – Ufuk
>> >
>> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron 
>> wrote:
>> >> Hi group,
>> >>
>> >>
>> >>
>> >> We are using Flink keyed state in several operators.
>> >>
>> >> Is there an easy was to expose the data that is stored in the state,
>> i.e.
>> >> the key and the values?
>> >>
>> >> This is needed for both monitoring as well as debugging. We would like
>> >> to understand how many key+values are stored in each state and also to
>> >> view the data itself.
>> >>
>> >> I know that there is the "Queryable state" option, but this is still
>> >> in Beta, and doesn't really give us what we want easily.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *We are using Flink 1.3.2 with Java.
>> >>
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Liron
>>
>>
>
>


Re: Flink State monitoring

2018-04-20 Thread Stefan Richter
I wonder if comparing the size of a full checkpoint vs total size of an 
incremental checkpoint to get insights about the keycount is helpful at all 
because:

- Full checkpoints are basically a dump of all key value pairs as written by 
their serializers, plus their keygoup id. Each key is contained exactly once. 
They are uncompressed, unless you enable compression explicitly.
- Incremental checkpoints use RocksDB’s internal SSTable format, can contain 
additional metadata, and they are always compressed (typically with snappy). 
- Adding up the sizes of all increments can account for each key multiple 
times, because each delta might contain an update per key. Furthermore, deltas 
also contain entries for deleted keys because that is how they must work.

Best,
Stefan 


> Am 20.04.2018 um 11:36 schrieb Juho Autio :
> 
> Hi Aljoscha & co.,
> 
> Is there any way to monitor the state size yet? Maybe a ticket in Jira?
> 
> When using incremental checkpointing, the total state size can't be seen 
> anywhere. For example the checkpoint details only show the size of the 
> increment. It would be nice to add the total size there as well. The only way 
> I know currently for figuring out the total state size is by triggering a 
> manual savepoint. But this doesn't work at all if the state has grown so big 
> that savepoint times out.
> 
> Also when Flink restores state from an incrementally created checkpoint, it 
> doesn't offer a way to see the total size.
> 
> On Thu, Jan 4, 2018 at 8:23 PM, Steven Wu  > wrote:
> Aljoscha/Stefan, 
> 
> if incremental checkpoint is enabled, I assume the "checkpoint size" is only 
> the delta/incremental size (not the full state size), right?
> 
> Thanks,
> Steven
> 
> 
> On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> I'm afraid there is currently no metrics around state. I see that it's very 
> good to have so I'm putting it on my list of stuff that we should have at 
> some point.
> 
> One thing that comes to mind is checking the size of checkpoints, which gives 
> you an indirect way of figuring out how big state is but that's not very 
> exact, i.e. doesn't give you "number of keys" or some such.
> 
> Best,
> Aljoscha
> 
> > On 20. Dec 2017, at 08:09, Netzer, Liron  > > wrote:
> >
> > Ufuk, Thanks for replying !
> >
> > Aljoscha, can you please assist with the questions below?
> >
> > Thanks,
> > Liron
> >
> > -Original Message-
> > From: Ufuk Celebi [mailto:u...@apache.org ]
> > Sent: Friday, December 15, 2017 3:06 PM
> > To: Netzer, Liron [ICG-IT]
> > Cc: user@flink.apache.org 
> > Subject: Re: Flink State monitoring
> >
> > Hey Liron,
> >
> > unfortunately, there are no built-in metrics related to state. In general, 
> > exposing the actual values as metrics is problematic, but exposing summary 
> > statistics would be a good idea. I'm not aware of a good work around at the 
> > moment that would work in the general case (taking into account state 
> > restore, etc.).
> >
> > Let me pull in Aljoscha (cc'd) who knows the state backend internals well.
> >
> > @Aljoscha:
> > 1) Are there any plans to expose keyed state related metrics (like number 
> > of keys)?
> > 2) Is there a way to work around the lack of these metrics in 1.3?
> >
> > – Ufuk
> >
> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron  > > wrote:
> >> Hi group,
> >>
> >>
> >>
> >> We are using Flink keyed state in several operators.
> >>
> >> Is there an easy was to expose the data that is stored in the state, i.e.
> >> the key and the values?
> >>
> >> This is needed for both monitoring as well as debugging. We would like
> >> to understand how many key+values are stored in each state and also to
> >> view the data itself.
> >>
> >> I know that there is the "Queryable state" option, but this is still
> >> in Beta, and doesn't really give us what we want easily.
> >>
> >>
> >>
> >>
> >>
> >> *We are using Flink 1.3.2 with Java.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Liron
> 
> 
> 



Operators in Flink

2018-04-20 Thread Felipe Gutierrez
Hi,

I have a doubt about Flink operators implementation and I am trying to
search some link on the internet about this
Do the operators in Flink are compiled and fused at runtime? Do you guys
have a link that can explain to me how it is implemented in Flink?

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: gonna need more logs when task manager is shutting down

2018-04-20 Thread Fabian Hueske
Hi Makeyang,

Would you mind opening a JIRA issue [1] for your improvement suggestion?
It would be good to add the Flink version that you are running.

Thanks, Fabian

[1] https://issues.apache.org/jira/projects/FLINK

2018-04-20 6:21 GMT+02:00 makeyang :

> one of my task manager is out ot the cluster and I checked its log found
> something below:
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally Process (115/120)
> (19d0b0ce1ef3b8023b37bdfda643ef44).
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from
> RUNNING
> to FAILED.
> java.lang.Exception: TaskManager is shutting down.
> at
> org.apache.flink.runtime.taskmanager.TaskManager.
> postStop(TaskManager.scala:220)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.
> aroundPostStop(TaskManager.scala:121)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$
> finishTerminate(FaultHandling.scala:210)
> at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:374)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
> at akka.dispatch.Mailbox.processAllSystemMessages(
> Mailbox.scala:282)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> well,
> Attempting to fail task externally Process due to what?
> when task manager is shutting down and due to what?
>
> these import info is not found in log which is actually very useful
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink State monitoring

2018-04-20 Thread Stefan Richter
Hi,

for incremental checkpoints, it is only showing the size of the deltas. It 
would probably also be possible to report the full size, but the current 
reporting and UI is only supporting to deliver a single value. In general, some 
things are rather hard to report. For example, for the heap based backend, is 
the state size the size of the serialized data or the size of the heap objects? 
Another remark about key count: the key count is easy to determine for the heap 
based backend, but there is no (efficient) method in RocksDb that gives the key 
count (because of the way RocksDB works). In this case, afaik, we have the 
(inefficient) option to iterate all keys and count or use the (efficient) 
estimated key count is supported by RocksDB.

Best,
Stefan

> Am 04.01.2018 um 19:23 schrieb Steven Wu :
> 
> Aljoscha/Stefan, 
> 
> if incremental checkpoint is enabled, I assume the "checkpoint size" is only 
> the delta/incremental size (not the full state size), right?
> 
> Thanks,
> Steven
> 
> 
> On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek  > wrote:
> Hi,
> 
> I'm afraid there is currently no metrics around state. I see that it's very 
> good to have so I'm putting it on my list of stuff that we should have at 
> some point.
> 
> One thing that comes to mind is checking the size of checkpoints, which gives 
> you an indirect way of figuring out how big state is but that's not very 
> exact, i.e. doesn't give you "number of keys" or some such.
> 
> Best,
> Aljoscha
> 
> > On 20. Dec 2017, at 08:09, Netzer, Liron  > > wrote:
> >
> > Ufuk, Thanks for replying !
> >
> > Aljoscha, can you please assist with the questions below?
> >
> > Thanks,
> > Liron
> >
> > -Original Message-
> > From: Ufuk Celebi [mailto:u...@apache.org ]
> > Sent: Friday, December 15, 2017 3:06 PM
> > To: Netzer, Liron [ICG-IT]
> > Cc: user@flink.apache.org 
> > Subject: Re: Flink State monitoring
> >
> > Hey Liron,
> >
> > unfortunately, there are no built-in metrics related to state. In general, 
> > exposing the actual values as metrics is problematic, but exposing summary 
> > statistics would be a good idea. I'm not aware of a good work around at the 
> > moment that would work in the general case (taking into account state 
> > restore, etc.).
> >
> > Let me pull in Aljoscha (cc'd) who knows the state backend internals well.
> >
> > @Aljoscha:
> > 1) Are there any plans to expose keyed state related metrics (like number 
> > of keys)?
> > 2) Is there a way to work around the lack of these metrics in 1.3?
> >
> > – Ufuk
> >
> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron  > > wrote:
> >> Hi group,
> >>
> >>
> >>
> >> We are using Flink keyed state in several operators.
> >>
> >> Is there an easy was to expose the data that is stored in the state, i.e.
> >> the key and the values?
> >>
> >> This is needed for both monitoring as well as debugging. We would like
> >> to understand how many key+values are stored in each state and also to
> >> view the data itself.
> >>
> >> I know that there is the "Queryable state" option, but this is still
> >> in Beta, and doesn't really give us what we want easily.
> >>
> >>
> >>
> >>
> >>
> >> *We are using Flink 1.3.2 with Java.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Liron
> 
> 



Re: Flink State monitoring

2018-04-20 Thread Juho Autio
Hi Aljoscha & co.,

Is there any way to monitor the state size yet? Maybe a ticket in Jira?

When using incremental checkpointing, the total state size can't be seen
anywhere. For example the checkpoint details only show the size of the
increment. It would be nice to add the total size there as well. The only
way I know currently for figuring out the total state size is by triggering
a manual savepoint. But this doesn't work at all if the state has grown so
big that savepoint times out.

Also when Flink restores state from an incrementally created checkpoint, it
doesn't offer a way to see the total size.

On Thu, Jan 4, 2018 at 8:23 PM, Steven Wu  wrote:

> Aljoscha/Stefan,
>
> if incremental checkpoint is enabled, I assume the "checkpoint size" is
> only the delta/incremental size (not the full state size), right?
>
> Thanks,
> Steven
>
>
> On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> I'm afraid there is currently no metrics around state. I see that it's
>> very good to have so I'm putting it on my list of stuff that we should have
>> at some point.
>>
>> One thing that comes to mind is checking the size of checkpoints, which
>> gives you an indirect way of figuring out how big state is but that's not
>> very exact, i.e. doesn't give you "number of keys" or some such.
>>
>> Best,
>> Aljoscha
>>
>> > On 20. Dec 2017, at 08:09, Netzer, Liron  wrote:
>> >
>> > Ufuk, Thanks for replying !
>> >
>> > Aljoscha, can you please assist with the questions below?
>> >
>> > Thanks,
>> > Liron
>> >
>> > -Original Message-
>> > From: Ufuk Celebi [mailto:u...@apache.org]
>> > Sent: Friday, December 15, 2017 3:06 PM
>> > To: Netzer, Liron [ICG-IT]
>> > Cc: user@flink.apache.org
>> > Subject: Re: Flink State monitoring
>> >
>> > Hey Liron,
>> >
>> > unfortunately, there are no built-in metrics related to state. In
>> general, exposing the actual values as metrics is problematic, but exposing
>> summary statistics would be a good idea. I'm not aware of a good work
>> around at the moment that would work in the general case (taking into
>> account state restore, etc.).
>> >
>> > Let me pull in Aljoscha (cc'd) who knows the state backend internals
>> well.
>> >
>> > @Aljoscha:
>> > 1) Are there any plans to expose keyed state related metrics (like
>> number of keys)?
>> > 2) Is there a way to work around the lack of these metrics in 1.3?
>> >
>> > – Ufuk
>> >
>> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron 
>> wrote:
>> >> Hi group,
>> >>
>> >>
>> >>
>> >> We are using Flink keyed state in several operators.
>> >>
>> >> Is there an easy was to expose the data that is stored in the state,
>> i.e.
>> >> the key and the values?
>> >>
>> >> This is needed for both monitoring as well as debugging. We would like
>> >> to understand how many key+values are stored in each state and also to
>> >> view the data itself.
>> >>
>> >> I know that there is the "Queryable state" option, but this is still
>> >> in Beta, and doesn't really give us what we want easily.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *We are using Flink 1.3.2 with Java.
>> >>
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Liron
>>
>>
>


Re: Managing state migrations with Flink and Avro

2018-04-20 Thread Timo Walther

Hi Petter,

which state backend are you using in your case? I think there is no 
quick solution for your problem because a proper schema evolution story 
is on the roadmap for Flink 1.6.


Would it work to change the serial version id of the generated Avro 
class as a temporary workaround?


Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your 
code to verify it.


Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:

Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther > wrote:


Hi Petter,

could you share the source code of the class that Avro generates
out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:

Hello everyone,

I am trying to figure out how to set up Flink with Avro for
state management (especially the content of snapshots) to enable
state migrations (e.g. adding a nullable fields to a class). So
far, in version 1.4.2, I tried to explicitly provide an instance
of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
is a very simple Avro generated SpecificRecordBase of the
following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then
try to load the snapshot with an updated schema (adding the
nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"},
 {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the
following exception:
2018-04-17 09:35:23,519 WARN
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
- Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException:
io.relayr.flink.Accumulator; local class incompatible: stream
classdesc serialVersionUID = -3555733236161157838, local class
serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for
the bean, I just didn't expect it to be used and expected the
Avro schema to be used instead? Did anyone get this working?
What am I getting wrong?

Best regards,
Petter










Re: masters file only needed when using start-cluster.sh script?

2018-04-20 Thread David Corley
Great! Thanks Gary

On 20 April 2018 at 08:22, Gary Yao  wrote:

> Hi David,
>
> You are right. If you don't use start-cluster.sh, the conf/masters file is
> not
> needed.
>
> Best,
> Gary
>
>
> On Wed, Apr 18, 2018 at 8:25 AM, David Corley 
> wrote:
>
>> The HA documentation is a little confusing in that it suggests JM
>> registration and discovery is done via Zookeeper, but it also recommends
>> creating a static `masters` file listing all JMs.
>> The only use I can currently see for the masters file is by the
>> `start-cluster.sh` script.
>> Thus, if we're not using that script, do we need the masters file at all?
>>
>
>


Re: masters file only needed when using start-cluster.sh script?

2018-04-20 Thread Gary Yao
Hi David,

You are right. If you don't use start-cluster.sh, the conf/masters file is
not
needed.

Best,
Gary

On Wed, Apr 18, 2018 at 8:25 AM, David Corley  wrote:

> The HA documentation is a little confusing in that it suggests JM
> registration and discovery is done via Zookeeper, but it also recommends
> creating a static `masters` file listing all JMs.
> The only use I can currently see for the masters file is by the
> `start-cluster.sh` script.
> Thus, if we're not using that script, do we need the masters file at all?
>