How flink monitor source stream task(Time Trigger) is running?

2017-09-28 Thread yunfan123
In my understanding, flink just use task heartbeat to monitor taskManager is running. If source stream (Time Trigger for XXX)thread is crash, it seems flink can't recovery from this state? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re:Re: Exception in BucketingSink when cancelling Flink job

2017-09-28 Thread wangsan
Hi, 'Join' method can be call with a timeout (as is called in TaskCanceler), so it won't be block forever if the respective thread is in deadlock state. Maybe calling 'interrupt()' after 'join(timeout)' is more reasonable, altought it still can not make sure operations inside 'close()'

Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Yunus Olgun
Hi Kostas, Aljoscha, To answer Kostas’s concern, the algorithm works this way: Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 time units. 1st cycle: Source-0 sends records with timestamp

starting query server when running flink embedded

2017-09-28 Thread Henri Heiskanen
Hi, I would like to test queryable state just by running the flink embedded from my IDE. What is the easiest way to start it properly? If I run the below I can not see the query server listening at the given port. I found something about this, but it was about copying some base classes and post

Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Aljoscha Krettek
To quickly make Kostas' intuition concrete: it's currently not possible to have watermarks broadcast but the data be locally forwarded. The reason is that watermarks and data travel in the same channels so if the watermark needs to be broadcast there needs to be an n to m (in this case m == n)

Re: Job Manager minimum memory hard coded to 768

2017-09-28 Thread Aljoscha Krettek
I believe this could be from a time when there was not yet the setting "containerized.heap-cutoff-min" since this part of the code is quite old. I think we could be able to remove that restriction but I'm not sure so I'm cc'ing Till who knows those parts best. @Till, what do you think? > On

Job Manager minimum memory hard coded to 768

2017-09-28 Thread Dan Circelli
In our usage of Flink, our Yarn Job Manager never goes above ~48 MB of heap utilization. In order to maximize the heap available to the Task Managers I thought we could shrink our Job Manager heap setting down from the 1024MB we were using to something tiny like 128MB. However, doing so results

Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay, After reading all the data from your source, could you somehow tell your sources to send a watermark of Long.MaxValue (or a high value)?? I am asking this, just to see if the problem is that the data is simply buffered inside Flink because there is a problem with the timestamps and

Re: Issue with CEP library

2017-09-28 Thread Ajay Krishna
Hi Kostas, Thank you for reaching out and for the suggestions. Here are the results 1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic 2. I removed the additional keyBy(0) but that did not change anything 3. I also

Re: Custom Serializers

2017-09-28 Thread nragon
Got it :) I've redesign my object which I use across jobs. Ended up with 4 serializers. My object Element holds 2 fields, an array of Parameter and a Metadata. Metadata holds an array of ParameterInfo and each Parameter holds it's ParameterInfo (Kinda duplicate against Metadata but needed for

Re: how many 'run -c' commands to start?

2017-09-28 Thread r. r.
Thank you, Chesnay to make sure - should the node where the job has been submitted goes down, the processing will continue, I hope? Do I need to ensure this by configuration? btw I added --detached param to the run cmd, but it didn't go into background process as I would've expected. Am I

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan, That reason makes sense to me. Thanks for point me out. About my job, the database currently was never used, I disabled it for some reasons, but output to s3 was implemented by async io. I used ForkJoinPool with 50 capacity. I have tried to rebalance after count window to monitor the

Re: how many 'run -c' commands to start?

2017-09-28 Thread Chesnay Schepler
Hi! Given a Flink cluster, you would only call `flink run ...` to submit a job once; for simplicity i would submit it on the node where you started the cluster. Flink will automatically distribute job across the cluster, in smaller independent parts known as Tasks. Regards, Chesnay On

Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler
On 19.09.2017 11:39, nragon wrote: createInstance(Object[] fields) at TupleSerializerBase seems not to be part of TypeSerializer API. Will I be loosing any functionality? In what cases do you use this instead of createInstance()? // We use this in the Aggregate and Distinct Operators to create

Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler
On 19.09.2017 11:39, nragon wrote: createInstance(Object[] fields) at TupleSerializerBase seems not to be part of TypeSerializer API. Will I be loosing any functionality? In what cases do you use this instead of createInstance()? // We use this in the Aggregate and Distinct Operators to create

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi, the gap between the sync and the async part does not mean too much. What happens per task is that all operators go through their sync part, and then one thread executes all the async parts, one after the other. So if an async part starts late, this is just because it started only after

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi, Sorry. This is the correct one. Best Regards, Tony Wei 2017-09-28 18:55 GMT+08:00 Tony Wei : > Hi Stefan, > > Sorry for providing partial information. The attachment is the full logs > for checkpoint #1577. > > Why I would say it seems that asynchronous part was not

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan, Sorry for providing partial information. The attachment is the full logs for checkpoint #1577. Why I would say it seems that asynchronous part was not executed immediately is due to all synchronous parts were all finished at 2017-09-27 13:49. Did that mean the checkpoint barrier event

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi, I agree that the memory consumption looks good. If there is only one TM, it will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end time? This time measurement starts when the checkpoint is triggered on the job manager, the first contributor is then the time that it

Re: Flink on EMR

2017-09-28 Thread Stefan Richter
Hi, for issue 1, you could delete the slf4j jar from Flink’s lib folder, but I wonder if this producing any problems even with the warning? For issue 2, my question is where you found that only 5GB have been allocated? Did you consider that Flink only allocates a fraction of the memory for

Question about checkpointing with stateful operators and state recovery

2017-09-28 Thread Federico D'Ambrosio
Hi, I've got a couple of questions concerning the topics in the subject: 1. If an operator is getting applied on a keyed stream, do I still have to implement the CheckpointedFunction trait and define the snapshotState and initializeState methods, in order to successfully recover the state

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Tony Wei
Hi Stefan, These are some telemetry information, but I don't have history information about gc. [image: 內置圖片 2] [image: 內置圖片 1] 1) Yes, my state is not large. 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use

Flink Savepoint Config parameter

2017-09-28 Thread ant burton
Hey, When running in EMR and taking a savepoint with flink cancel -s SAVEPOINT_DIR JOB_ID results in the following error Caused by: org.apache.flink.util.ConfigurationException: Config parameter 'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is missing

Re: Kinesis connector - Jackson issue

2017-09-28 Thread Tomasz Dobrzycki
Hi guys, I was able to solve the issue. I deleted all my Flink distributions and followed these steps: 1) Clone Flink source (because I'm building Flink with Kinesis connector) 2) Checkout to release-1.3.1 (that's the version of Flink on EMR) 3) mvn clean install -Pinclude-kinesis -DskipTests

Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay, I will look a bit more on the issue. But in the meantime, could you run your job with parallelism of 1, to see if the results are the expected? Also could you change the pattern, for example check only for the start, to see if all keys pass through. As for the code, you apply

Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Kostas Kloudas
Hi Yunus, I see. Currently I am not sure that you can simply broadcast the watermark only, without having a shuffle. But one thing to notice about your algorithm is that, I am not sure if your algorithm solves the problem you encounter. Your algorithm seems to prioritize the stream with the

Re: Stream Task seems to be blocked after checkpoint timeout

2017-09-28 Thread Stefan Richter
Hi, when the async part takes that long I would have 3 things to look at: 1) Is your state so large? I don’t think this applies in your case, right? 2) Is something wrong with writing to DFS (network, disks, etc)? 3) Are we running low on memory on that task manager? Do you have telemetry

how many 'run -c' commands to start?

2017-09-28 Thread r. r.
Hello I successfully ran a job with 'flink run -c', but this is for the local setup. How should i proceed with a cluster? Will flink automagically instantiate the job on all servers - i hope i don't have to start 'flink run -c' on all machines. New to flink and bigdata, so sorry for the