Re: ListState - elements order

2018-09-14 Thread vino yang
Hi, I saw one of ListState's implementations of HeapListState, and its internal data store uses the JDK's List. Of course, from an API point of view, maybe we can't make an absolute order guarantee. But if we look at it from a particular implementation, we can see if it can guarantee this, of cour

Window operator schema evolution - savepoint deserialization fail

2018-09-14 Thread tisonet
Hi, I run simple streaming job where I compute hourly impressions for campaigns: .keyBy(imp => imp.campaign_id) .window(TumblingEventTimeWindows.of(...)) .aggregate(new BudgetSpendingByImpsAggregateFunction(), new BudgetSpendingByImpsWindowFunction()) Where aggregate function just sums impressio

Re: How to clear keyed states periodically?

2018-09-14 Thread David Anderson
Paul, Theoretically, processing-time timers will get the job done, but yes, you'd need a timer per key -- and folks who've tried this with millions of keys, all firing at the same time, have reported that this behaves badly. For some use cases it's workable to spread out the timers over an interva

Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Yuan,Youjun
Hi, I am getting the following error while submitting job to a cluster, which seems failed to compare 2 RelDateTypes, though they seems identical (from the error message), and everything is OK if I run it locally. I guess calcite failed to compare the first field named ts, of type TIMESTAMP(3),

Re: How to clear keyed states periodically?

2018-09-14 Thread Kien Truong
Hi Paul, We have actually done something like this. Clearing a state with rocksdb state backend can actually be a very expensive operation, and block the operators for minutes with large states. To mitigate that, there are 2 approaches that we are using 1. Keeping the state small by increasing t

Re: ListState - elements order

2018-09-14 Thread Vijay Bhaskar
How it would be to use ValueState with values as string separated by the delimiter. So that order will never be a problem. Only overhead is to separate delimiter, read the elements and convert them into primitive types in case necessary. It just workaround. In case doesn't suite your requirements p

Re: How to clear keyed states periodically?

2018-09-14 Thread Paul Lam
Hi David, Your information is very helpful! Thank you! BroadcastStream can definitely do the job, but I think it makes the architecture kind of complicated, so it will be my last resort . I wonder if it’s possible to implement a clearAll() method for keyed states which clears user states for

Re: How to clear keyed states periodically?

2018-09-14 Thread David Anderson
Paul -- I'm not confident the broadcast approach would perform well enough. Even without all those timers your job might behave poorly if you try to hit all of the keys at once to clear all the state; I don't know that anyone has tried this. As Kien suggested, it may be necessary to find an approac

Re: How to clear keyed states periodically?

2018-09-14 Thread Paul Lam
Hi Kien, Thanks for your reply! The approaches your suggested are very useful. I'll redesign the state structure and try these approaches. Thanks a lot! Best, Paul Lam > 在 2018年9月14日,17:01,Kien Truong 写道: > > Hi Paul, > > We have actually done something like this. Clearing a state with ro

Compute Symmetrical Uncertainty in parallel

2018-09-14 Thread Alejandro
In this answer Optimizing Flink transformation (https://stackoverflow.com/questions/52151715/optimizing-flink-transformation/52225586#52225586): def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = { val su = xy.reduceGroup { in ⇒ val invec = in.toVector val x = invec map (_

Re: ListState - elements order

2018-09-14 Thread vino yang
Yes, for strings, we can do this, but it is not generic enough. But your idea reminds me that we decide which data structure to use, if we use ValueState. Vijay Bhaskar 于2018年9月14日周五 下午5:24写道: > How it would be to use ValueState with values as string separated by > the delimiter. So that order w

Re: Client failed to get cancel with savepoint response(Internet mail)

2018-09-14 Thread vino yang
Hi Paul, As far as writing programs are they are a few lines of code: *CompletableFuture savepointPath = client.triggerSavepoint();* *savepointPath.get(); //block until the savepoint completed* *client.cancel();* *Please note that this is just an example, not a real program, and there may be de

Re: ListState - elements order

2018-09-14 Thread Alexey Trenikhun
“ValueState with values as string separated by the delimiter” - is not necessary, it can be ValueState> instead. Drawback of using ValueState that it is necessary to serialize whole contained object when at least one element is added. Alexey Get Outlook for iOS

Re: Watermarks in Event Time windowing

2018-09-14 Thread David Anderson
To clarify one thing: keep in mind that Flink does not support per-key watermarks. Watermarks are typically generated per-source, or in the case of kafka, can be per-partition. An idle source (or in the case of kafka, an idle partition) can prevent an event-time window from being triggered, but you

Re: Can rocksDBBackend handle rescaling?

2018-09-14 Thread Andrey Zagrebin
Hi Jiayi Liao, I have answered the similar question here: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html#a23983

Re: Task managers run on separate nodes in a cluster

2018-09-14 Thread Martin Eden
Thanks Vino! On Fri, Sep 14, 2018 at 3:37 AM vino yang wrote: > Hi Martin, > > Till has done most of the work of Flink on Mesos. Ping Till for you. > > Thanks, vino. > > Martin Eden 于2018年9月12日周三 下午11:21写道: > >> Hi all, >> >> We're using Flink 1.3.2 with DCOS / Mesos. >> >> We have a 3 node clu

Re: Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Timo Walther
Hi, could you maybe post the query that caused the exception? I guess the exception is related to a time attribute [1] for the optimizer time attributes and timestamps make no difference however they have a slightly different data type that might have caused the error. I think is a bug that s

Unit / Integration Test Timer

2018-09-14 Thread ashish pok
All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-ut

Re: Window operator schema evolution - savepoint deserialization fail

2018-09-14 Thread Andrey Zagrebin
Hi, Adding a new field to a case class breaks serialisation format in savepoint at the moment and requires state migration which is currently not supported in Flink implicitly. Although, I would expect the failure earlier while performing the compatibility check upon restore. According to the

Question regarding state cleaning using timer

2018-09-14 Thread bhaskar . ebay77
Hi In the following example given in flink: object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAve

CombinableGroupReducer says The Iterable can be iterated over only once

2018-09-14 Thread Alejandro Alcalde
Hello all, I am trying to replicate the code in the Docs ( https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions ) But I keep getting the following exception: The Iterable can be iterated over only once. Only the first

答复: Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Yuan,Youjun
Hi Timo, I believe it has something to do with the number of field in the source. I can reproduce the issue with very simple SQL, like: INSERT INTO IVC_ALARM_PUBLIC_OUT SELECT rowtime as ts FROM IVC_ALARM_PUBLIC_IN Where rowtime actually represent proctime (I registered it as rowtime.proctime).

Re: Problem with querying state on Flink 1.6.

2018-09-14 Thread Joe Olson
Kostas - Thanks for the help. I've got a project in Github [1], which is basically the Flink 1.6 QueryableStateClient example from [2]. Trying to make a simple Flink 1.6 getKVState call to a QueryableStateClient that was initialized with a IP address on another machine results in nothing happening

Re: CombinableGroupReducer says The Iterable can be iterated over only once

2018-09-14 Thread 杨力
A java.util.Iterable is expected to provide iterators again and again. On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde wrote: > Hello all, > > I am trying to replicate the code in the Docs ( > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#c

Re: Unit / Integration Test Timer

2018-09-14 Thread Till Rohrmann
Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Re: Problem with querying state on Flink 1.6.

2018-09-14 Thread Till Rohrmann
Hi Joe, the code [1] looks as if you don't properly wait on the completion of the future. That way, the client has no chance to build up the connection to the server. Try to call resultFuture.get() which will wait for the result. [1] https://github.com/jolson787/qs/blob/master/code/test/qs01/src/

Re: Task managers run on separate nodes in a cluster

2018-09-14 Thread Till Rohrmann
Hi Martin, Flink supports the mesos.constraints.hard.hostattribute to specify task constraints based on agent attributes [1]. I think you could use them to control the task placement. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute

Re: CombinableGroupReducer says The Iterable can be iterated over only once

2018-09-14 Thread Alejandro
Then how I am suppose to implement that function? On 09/14/2018 05:29 PM, 杨力 wrote: > A java.util.Iterable is expected to provide iterators again and again. > > On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde > wrote: > >> Hello all, >> >> I am trying to replicate the code in the Docs ( >> h

Setting a custom Kryo serializer in Flink-Python

2018-09-14 Thread Joe Malt
Hi, I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer. When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1

LocalEnvironment and Python streaming

2018-09-14 Thread Joe Malt
Hi, Is there any way to execute a job using the LocalEnvironment when using the Python streaming API? This would make it much easier to debug jobs. At the moment I'm not aware of any way of running them except firing up a local cluster and submitting the job with pyflink-stream.sh. Thanks, Joe

Re: Unit / Integration Test Timer

2018-09-14 Thread ashish pok
Hi Till, To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear

Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-14 Thread Julio Biason
(Just an addendum: Although it's not a huge problem -- we can always increase the checkpoint timeout time -- this anomalous situation makes me think there is something wrong in our pipeline or in our cluster, and that is what is making the checkpoint creation go crazy.) On Fri, Sep 14, 2018 at 8:0