Re: How to compare two window ?

2018-08-16 Thread Piotr Nowojski
Hi,

Could you rephrase your question? Maybe by posting some code examples?

Piotrek

> On 16 Aug 2018, at 08:26, 苗元君  wrote:
> 
> Hi, Flink guys, 
> U really to a quick release, it's fantastic ! 
> 
> I'v got a situation , 
> window 1 is time driven, slice is 1min, trigger is 1 count
> window 2 is count driven, slice is 3 count, trigger is 1count
> 
> 1. Then element is out of window1 and just right into window2. 
> For example if there is only 2 element, window2 will have none element.  
> how to build window like this ? 
>I try to use window1 by structure (window trigger evictor) then window2 
> structure(trigger evictor)
>I got element calculate just in window1 and window2 in the same time
> 
> 2.  I try to find ways to use SQL on AllWindowedStream but seem not working. 
> Can SQL Query use on a WINDOW ?
> 3.  How to compare these SQL result ?
> 
> 
> 
> 
> Thank U so much.
> 
> -- 
> Yuanjun Miao
> 



Re: How to submit flink job on yarn by java code

2018-08-16 Thread Piotr Nowojski
Hi,

Is this path accessible on the container? If not, use some distributed file 
system, nfs or -yt —yarnship option of the cli.

Please also take a look at 
https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2...@mail.gmail.com%3E
 


Piotrek

> On 16 Aug 2018, at 11:05, spoon_lz <971066...@qq.com> wrote:
> 
> Sorry, I don't know why the code and error are not visible.
> The error is :
> The program finished with the following exception:
> 
> /org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at flink.SubmitDemo.submit(SubmitDemo.java:75)
>   at flink.SubmitDemo.main(SubmitDemo.java:50)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment. 
> Diagnostics from YARN: Application application_1526888270443_0090 failed 2
> times due to AM Container for appattempt_1526888270443_0090_02 exited
> with  exitCode: -1000
> For more detailed output, check application tracking
> page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
> click on links to logs of each attempt.
> Diagnostics: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> java.io.FileNotFoundException: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>   at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>   at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
>   at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1526888270443_0090
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>   ... 5 more/
> 
> and my code like :
> 
> /public class SubmitDemo {
> 
> 
>private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
>private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
>private static final String JAR_FILE =
> "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
> 
> 
>public static void main(String[] args) {
> 
>SubmitDemo demo = new SubmitDemo();
>demo.before();
>List parameters = new ArrayList<>();
>parameters.add("run");
>parameters.add("-d");
>parameters.add("-m");
>parameters.add("yarn-cluster");
>parameters.add("-ynm");
>parameters.add("lz_test_alone");
>parameters.add("-yn");
>parameters.add("4");
>parameters.add("-ytm");
>parameters.add("4096");
> 

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-16 Thread Piotr Nowojski
No problem :) You motivated me to do a fix for that, since I stumbled across 
this bug/issue myself before and also took me some time in the debugger to find 
the cause.

Piotrek

> On 16 Aug 2018, at 20:05, Ken Krugler  wrote:
> 
> Hi Piotr,
> 
> Thanks, and darn it that’s something I should have noticed.
> 
> — Ken
> 
> 
>> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> You made a small mistake when restoring from state using test harness, that 
>> I myself have also done in the past. Problem is with an ordering of those 
>> calls:
>> 
>> result.open();
>> if (savedState != null) {
>> result.initializeState(savedState);
>> }
>> 
>> Open is supposed to be called after initializeState, and if you look into 
>> the code of AbstractStreamOperatorTestHarness#open, if it is called before 
>> initialize, it will initialize harness without any state.
>> 
>> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
>> (test harness is not part of a Flink’s public api). I will try to fix this: 
>> https://issues.apache.org/jira/browse/FLINK-10159 
>> <https://issues.apache.org/jira/browse/FLINK-10159>
>> 
>> Piotrek
>> 
>>> On 16 Aug 2018, at 00:24, Ken Krugler >> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> It looks to me like the OperatorSubtaskState returned from 
>>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>>> had been registered via registerProcessingTimeTimer but had not yet fired 
>>> when the snapshot was saved.
>>> 
>>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>>> 
>>> If not, is there anything special I need to do when setting up the test 
>>> harness to ensure that timers are saved?
>>> 
>>> Below is the unit test, which shows how the test harness is being set up 
>>> and run.
>>> 
>>> The TimerFunction used in this test does seem to be doing the right thing, 
>>> as using it in a simple job on a local Flink cluster works as expected when 
>>> creating & then restarting from a savepoint.
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>>> ==
>>> TimerTest.java
>>> ==
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import static org.junit.Assert.assertTrue;
>>> 
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> 
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import 
>>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>>> 
>>> public class TimerTest {
>>> public static final Logger LOGGER = 
>>> LoggerFactory.getLogger(TimerTest.class);
>>> 
>>> private List _firedTimers = new ArrayList();
>>> 
>>> @Before
>>> public void setUp() throws Exception {
>>> }
>>> 
>>> @Test
>>> public void testTimerSaving() throws Throwable {
>>> 
>>> // This operator doesn't really do much at all, but the first 
>>> element
>>> // it processes will create a timer for (timestamp+1).
>>> // Whenever that timer fires, it will create another timer for 
>>> // (timestamp+1).
>>> KeyedProcessOperator operator = 
>>> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>> 
>>> // Create a test harness from scratch
>>> OneInputStreamOperatorTestHarness testHarness = 
>>> makeTestHarness(ope

Re: ***UNCHECKED*** Re: Standalone cluster instability

2018-09-19 Thread Piotr Nowojski
Hi,

JobManager is not responsible and have no means to restart TaskManager in case 
of TaskManager process being killed (it would need to have ssh into the machine 
and restart it…). I don’t know, but from your description of the problem I 
presume that Flink’s bash startup scripts do not contain a watchdog, that 
restarts the process in case of failure. In that case just google “bash 
watchdog" how to do it: for example https://stackoverflow.com/a/697064/8149051 
<https://stackoverflow.com/a/697064/8149051>

Probably better way would be to use yarn or other resource manager. Flink’s 
JobManager would then redeploy/reschedule new TaskManager after a failure.

Piotrek

> On 19 Sep 2018, at 09:35, Shailesh Jain  wrote:
> 
> Hi Piotrek,
> 
> We've hit the same issue again, kernel is repeatedly killing the task manager 
> process (we've hit it 3 times in the past one week).
> We suspect we're hitting this bug in the kernel: 
> https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842 
> <https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842>
> 
> One question I have is that why is the job manager not able to restart the 
> task manager process when it discovers that it has been lost? It reports that 
> there are no active task managers and available slots are 0. We're running on 
> flink version 1.4.2.
> 
> I've attached the syslog and jobmanager log, the crash happened at Sep 18 
> 23:31:14.
> 
> Thanks,
> Shailesh
> 
> On Thu, Aug 16, 2018 at 5:40 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I’m not aware of such rules of thumb. Memory consumption is highly 
> application and workload specific. It depends on how much things you allocate 
> in your user code and how much memory do you keep on state (in case of heap 
> state backend). Basically just as with most java applications, you have to 
> use trial and error method.
> 
> One good practice is to before any deployment, test your Flink application on 
> a testing cluster, that is identical to production cluster, by (re)processing 
> some of the production workload/backlog/data (in parallel to production 
> cluster).
> 
> Piotrek 
> 
>> On 16 Aug 2018, at 13:23, Shailesh Jain > <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> Thank you for your help Piotrek.
>> 
>> I think it was a combination of a. other processes taking up available 
>> memory and b. flink processes consuming all the memory allocated to them, 
>> that resulted in kernel running out of memory.
>> 
>> Are there any heuristics or best practices which you (or anyone in the 
>> community) recommend to benchmark memory requirements of a particular flink 
>> job?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Good that we are more or less on track with this problem :) But the problem 
>> here is not that heap size is too small, bot that your kernel is running out 
>> of memory and starts killing processes. Either:
>> 
>> 1. some other process is using the available memory 
>> 2. Increase memory allocation on your machine/virtual 
>> machine/container/cgroup
>> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
>> memory buffer pool). Of course for any given job/state 
>> size/configuration/cluster size there is some minimal reasonable memory size 
>> that you have to assign to Flink, otherwise you will have poor performance 
>> and/or constant garbage collections and/or you will start getting OOM errors 
>> from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on 
>> a different level).
>> 
>> Piotrek
>> 
>> 
>>> On 14 Aug 2018, at 07:36, Shailesh Jain >> <mailto:shailesh.j...@stellapps.com>> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for your reply. I checked through the syslogs for that time, and I 
>>> see this:
>>> 
>>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
>>> process 2305 (java) score 468 or sacrifice child
>>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 
>>> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>>> 
>>> As you pointed out, kernel killed the task manager process.
>>> 
>>> If I had already set the max heap size for the JVM (to 3GB in this case), 
>>> and the memory usage stats showed 2329MB being used 90 seconds earlier, it 
>>> seems a bit unlikely for

Re: How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Piotr Nowojski
Hi,

Yes, in your case half of the Kafka source tasks wouldn’t read/process any 
records (you can check that in web UI). This shouldn’t harm you, unless your 
records will be redistributed after the source. For example:

source.keyBy(..).process(new MyVeryHeavyOperator()).print()

Should be fine, because `keyBy(…)` will redistribute records. However

source.map(new MyVeryHeavyOperator()).print()

Will mean that half of `MyVeryHeavyOperator`s will be idling as well. To solve 
that, you might want to consider using 

dataStream.rebalance();

Piotrek

> On 21 Sep 2018, at 13:25, Taher Koitawala  wrote:
> 
> Hi All,
>  Let's say a topic in kafka has 5 partitions. If I spawn 10 Task 
> Managers with 1 slot each and parallelism is 10 then how will records be read 
> from the kafka topic if I use the FlinkKafkaConsumer to read.
> 
> Will 5 TM's read and the rest be ideal in that case? Is over subscribing the 
> number of TM's than the number of partitions in the Kafka topic guarantee 
> high throughput?
>  
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163



Re: How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Piotr Nowojski
No problem :)

Piotrek

> On 21 Sep 2018, at 15:04, Taher Koitawala  wrote:
> 
> Thanks a lot for the explanation. That was exactly what I thought should 
> happen. However, it is always good to a clear confirmation.
> 
> 
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
> 
> 
> On Fri, Sep 21, 2018 at 6:26 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Yes, in your case half of the Kafka source tasks wouldn’t read/process any 
> records (you can check that in web UI). This shouldn’t harm you, unless your 
> records will be redistributed after the source. For example:
> 
> source.keyBy(..).process(new MyVeryHeavyOperator()).print()
> 
> Should be fine, because `keyBy(…)` will redistribute records. However
> 
> source.map(new MyVeryHeavyOperator()).print()
> 
> Will mean that half of `MyVeryHeavyOperator`s will be idling as well. To 
> solve that, you might want to consider using 
> 
> dataStream.rebalance();
> 
> Piotrek
> 
>> On 21 Sep 2018, at 13:25, Taher Koitawala > <mailto:taher.koitaw...@gslab.com>> wrote:
>> 
>> Hi All,
>>  Let's say a topic in kafka has 5 partitions. If I spawn 10 Task 
>> Managers with 1 slot each and parallelism is 10 then how will records be 
>> read from the kafka topic if I use the FlinkKafkaConsumer to read.
>> 
>> Will 5 TM's read and the rest be ideal in that case? Is over subscribing the 
>> number of TM's than the number of partitions in the Kafka topic guarantee 
>> high throughput?
>>  
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
> 



Re: Between Checkpoints in Kafka 11

2018-09-24 Thread Piotr Nowojski
Hi,

I have nothing more to add. You (Dawid) and Vino explained it correctly :)

Piotrek

> On 24 Sep 2018, at 15:16, Dawid Wysakowicz  wrote:
> 
> Hi Harshvardhan,
> 
> Flink won't buffer all the events between checkpoints. Flink uses Kafka's 
> transaction, which are committed only on checkpoints, so the data will be 
> persisted on the Kafka's side, but only available to read once committed.
> I've cced Piotr, who implemented the Kafka 0.11 connector in case he wants to 
> correct me or add something to the answer.
> 
> Best,
> 
> Dawid
> 
> On 23/09/18 17:48, Harshvardhan Agrawal wrote:
>> Hi,
>> 
>> Can someone please help me understand how does the exactly once semantic 
>> work with Kafka 11 in Flink?
>> 
>> Thanks,
>> Harsh
>> 
>> On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal 
>> mailto:harshvardhan.ag...@gmail.com>> wrote:
>> Hi,
>> 
>> I was going through the blog post on how TwoPhaseCommitSink function works 
>> with Kafka 11. One of the things I don’t understand is: What is the behavior 
>> of the Kafka 11 Producer between two checkpoints? Say that the time interval 
>> between two checkpoints is set to 15 minutes. Will Flink buffer all records 
>> in memory in that case and start writing to Kafka when the next checkpoint 
>> starts?
>> 
>> Thanks!
>> -- 
>> Regards,
>> Harshvardhan
>> 
>> 
>> -- 
>> Regards,
>> Harshvardhan Agrawal
>> 267.991.6618 | LinkedIn 



Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2018-10-04 Thread Piotr Nowojski
Hi,

Thanks for reporting the problem. This bug was previously unknown to us. I have 
created a jira ticket for this bug:
https://issues.apache.org/jira/browse/FLINK-10491 


Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t 
know if there is some hot fix or anything that can at least mitigate/decrease 
the probability of the bug for you until we fix it properly. 

Piotrek

> On 4 Oct 2018, at 13:55, Aljoscha Krettek  wrote:
> 
> Hi,
> 
> this looks like a potential Flink bug. Looping in Nico and Piotr who have 
> looked into that in the past. Could you please comment on that?
> 
> Best,
> Aljoscha
> 
>> On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna 
>> mailto:krishna.narayanasw...@gs.com>> wrote:
>> 
>> Hi,
>>  
>> I am trying to run one large single job graph which has > 10k tasks. The 
>> form of the graph is something like
>> DataSource -> Filter -> Map [...multiple]
>> Sink1
>> Sink2
>> I am using a parallelism of 10 with 1 slot per task manager and a memory 
>> allocation of 32G per TM. The JM is running with 8G.
>>  
>> Everything starts up and runs fine with close to 6-7k tasks (this is 
>> variable and is mostly the source /filter/map portions) completing and then 
>> the graph just hangs.  I managed to connect to the task managers and get a 
>> thread dump just in time and found the following deadlock on one of the TMs 
>> which apparently seems to be holding up everything else.
>> Please could someone take a look and advise if there is something I could do 
>> or try out to fix this.
>>  
>> Marked below are the 2 isolated thread stacks marking the deadlock -
>>  
>> Thread-1
>> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 
>> nid=NA waiting for monitor entry
>> waiting for Map (Key Extractor) (1/10)@9967 to release lock on 
>> <0x2dfb> (a java.util.ArrayDeque)
>>   at 
>> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
>>   at 
>> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
>>   - locked <0x2dfd> (a java.util.ArrayDeque)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
>>   - locked <0x2da5> (a java.lang.Object)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
>>   at 
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
>>   at 
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>   at 
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>   at 
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>   at 
>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>   at java.lang.Thread.run(Thread.java:745)
>>  
>>  
>> Thread-2
>> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for 
>> monitor entry
>>   java.lang.Thread.State: BLOCKED
>> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
>> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to 
>> release lock on <0x2dfd> (a java.util.ArrayDeque)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
>>   at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
>>   at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
>>   at 
>> org.apache.flink.runtime.io.netw

Re: Watermark through Rest Api

2018-10-08 Thread Piotr Nowojski
Hi,

Watermarks are tracked per Task/Operator level:

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
 


Tracking watermarks on the job level would be problematic, since it would 
require some kind of aggregation (min? Avg?) of all of the operators.

Piotrek 

> On 9 Oct 2018, at 02:13, Gregory Fee  wrote:
> 
> Hello! I am interested in getting the current low watermark for tasks
> in my Flink jobs. I know how to see them in the UI. I'm interested in
> getting them programmatically, hopefully via rest api. The
> documentation says that they are exposed as metrics but I don't see
> watermark info in the 'metrics' section in the job detail. Does anyone
> know how I might get the watermark information?
> 
> Thanks,
> -- 
> Gregory Fee
> Engineer



Re: JobManager did not respond within 60000 ms

2018-10-09 Thread Piotr Nowojski
Hi,

You have quite complicated job graph and very low memory settings for the job 
manager and task manager. It might be that long GC pauses are causing this 
problem.

Secondly, there are quite some results in google search 

 of this error that points toward high-availability issues. Have you read those 
previously reported problems? 

Thanks, Piotrek 

> On 9 Oct 2018, at 09:57, jpreis...@free.fr wrote:
> 
> I have a streaming job that works in standalone cluster. Flink version is 
> 1.4.1. Everything was working so far. But since I added new treatments, I can 
> not start my job anymore. I have this exception : 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: JobManager did not respond within 6 ms
>at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
>at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
>at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
>at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
>at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
>at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
>at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
>at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager 
> did not respond within 6 ms
>at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
>at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
>... 11 more
> Caused by: java.util.concurrent.TimeoutException
>at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
>... 12 more
> 
> I see a very strange behavior. When I comment on a function (any one, for 
> example a FilterFunction, which was present before or after my modification). 
> I tried to change the configuration (akka.client.timeout and akka.framesize) 
> without success.
> 
> This is my flink-conf.yaml
>  jobmanager.rpc.address: myhost
>  jobmanager.rpc.port: 6123
>  jobmanager.heap.mb: 128
>  taskmanager.heap.mb: 1024
>  taskmanager.numberOfTaskSlots: 100
>  taskmanager.memory.preallocate: false
>  taskmanager.data.port: 6121
>  parallelism.default: 1
>  taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
>  blob.storage.directory: /dohdev/flink/tmp/blob
>  jobmanager.web.port: -1
>  high-availability: zookeeper
>  high-availability.zookeeper.quorum: localhost:2181
>  high-availability.zookeeper.path.root: /dohdev/flink
>  high-availability.cluster-id: dev
>  high-availability.storageDir: file:mnt/metaflink
>  high-availability.zookeeper.storageDir: 
> /mnt/metaflink/inh/agregateur/recovery
>  restart-strategy: fixed-delay
>  restart-strategy.fixed-delay.attempts: 1000
>  restart-strategy.fixed-delay.delay: 5 s
>  zookeeper.sasl.disable: true
>  blob.service.cleanup.interval: 60
> 
> And I launch a job with this command : bin/flink run -d myjar.jar
> 
> I added as an attachment a graph of my job when it works (Graph.PNG).
> 
> Do you have an idea of the problem ?
> 
> Thanks.
> Julien
> 
> 
> 



Re: Wondering how to check if taskmanager is registered with the jobmanager, without asking job manager

2018-10-10 Thread Piotr Nowojski
Hi,

I don’t think that’s exposed on the TaskManager.

Maybe it would simplify things a bit if you implement this as a single 
“JobManager” health check, not multiple TaskManagers health check - for example 
verify that there are expected number of registered TaskManagers. It might 
cover your case.

Piotrek

> On 9 Oct 2018, at 12:21, Bariša  wrote:
> 
> As part of deploying task managers and job managers, I'd like to expose 
> healthcheck on both task managers and job managers.
> 
> For the task managers, one of the requirements that they are healthy, is that 
> they have successfully registered themselves with the job manager.
> 
> Is there a way to achieve this, without making a call to job manager ( to do 
> that, I first need to make a call to the zookeeper to find the job manager, 
> so I'm trying to simplify the health check ).
> 
> Ideally, taskmanager would have a metric that says, ( am registered ), but 
> afaik, that doesn't exist 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster
>  
> 
> 
> 
> P.S.
> This is my first post in the email list, happy to update/change my question, 
> if I messed up, or misunderstood something.
> 
> Cheers,
> Barisa



Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-10 Thread Piotr Nowojski
Hi,

Was this happening in older Flink version? Could you post in what circumstances 
the job has been moved to a new TM (full job manager logs and task manager logs 
would be helpful)? I’m suspecting that those leftover files might have 
something to do with local recovery.

Piotrek 

> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev  wrote:
> 
> After digging more in the log, I think it's more a bug. I've greped a log by 
> job id and found under normal circumstances TM supposed to delete flink-io 
> files. For some reason, it doesn't delete files that were listed above.
> 
> 2018-10-08 22:10:25,865 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
> 2018-10-08 22:10:25,867 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
> 2018-10-08 22:10:25,874 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
> 2018-10-08 22:17:38,680 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,686 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,691 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 
> 
> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev  > wrote:
> Dear all,
> 
> While running Flink 1.6.1 with RocksDB as a backend and hdfs as checkpoint 
> FS, I've noticed that after a job has moved to a different host it leaves 
> quite a huge state in temp folder(1.2TB in total). The files are not used as 
> TM is not running a job on the current host. 
> 
> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but 
> then it was moved to a different TM. I'm wondering is it intended behavior or 
> a possible bug?
> 
> I've attached files that are left and not used by a job in PrintScreen.



Re: JobManager did not respond within 60000 ms

2018-10-10 Thread Piotr Nowojski
Hi again,

Glad that you solved your problem :)

Splitting code into smaller functions has its advantages, but more 
operators/tasks means more overhead for JobManager/TaskManager to manage them. 
Usually that’s not a big issue, but as I said, you were running your cluster on 
extremely low memory settings.

Piotrek

> On 9 Oct 2018, at 18:09, jpreis...@free.fr wrote:
> 
> Hi Piotrek,
> 
> Thank you for your answer. Actually it was necessary to increase the memory 
> of the JobManager (I had tested it but I had not restarted Flink ...).
> 
> I will also work on optimization. I thought it was good practice to create as 
> much function as possible based on their functional value (for example: 
> create two FilterFunctions that have a different functional meaning). So I 
> will try to have fewer functions (for example: gather my two FilterFunctions 
> in one).
> 
> Thanks again Piotrek !
> 
> Julien.
> 
> - Mail original -
> De: "Piotr Nowojski" 
> À: jpreis...@free.fr
> Cc: user@flink.apache.org
> Envoyé: Mardi 9 Octobre 2018 10:37:58
> Objet: Re: JobManager did not respond within 6 ms
> 
> Hi, 
> 
> 
> You have quite complicated job graph and very low memory settings for the job 
> manager and task manager. It might be that long GC pauses are causing this 
> problem. 
> 
> 
> Secondly, there are quite some results in google search of this error that 
> points toward high-availability issues. Have you read those previously 
> reported problems? 
> 
> 
> Thanks, Piotrek 
> 
> 
> 
> 
> 
> On 9 Oct 2018, at 09:57, jpreis...@free.fr wrote: 
> 
> 
> I have a streaming job that works in standalone cluster. Flink version is 
> 1.4.1. Everything was working so far. But since I added new treatments, I can 
> not start my job anymore. I have this exception : 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: JobManager did not respond within 6 ms 
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>  
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) 
> at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) 
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) 
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) 
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) 
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>  
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) 
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager 
> did not respond within 6 ms 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
>  
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
>  
> ... 11 more 
> Caused by: java.util.concurrent.TimeoutException 
> at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
>  
> ... 12 more 
> 
> I see a very strange behavior. When I comment on a function (any one, for 
> example a FilterFunction, which was present before or after my modification). 
> I tried to change the configuration (akka.client.timeout and akka.framesize) 
> without success. 
> 
> This is my flink-conf.yaml 
> jobmanager.rpc.address: myhost 
> jobmanager.rpc.port: 6123 
> jobmanager.heap.mb: 128 
> taskmanager.heap.mb: 1024 
> taskmanager.numberOfTaskSlots: 100 
> taskmanager.memory.preallocate: false 
> taskmanager.data.port: 6121 
> parallelism.default: 1 
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr 
> blob.storage.directory: /dohdev/flink/tmp/blob 
> jobmanager.web.port: -1 
> high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost:2181 
> high-availability.zookeeper.path.root: /dohdev/flink 
> high-availability.cluster-id: dev 
> high-availability.storageDir: file:mnt/metaflink 
> high-availability.zookeeper.storageDir: 
> /mnt/metaflink/inh/agregateur/recovery 
> restart-strategy: fixed-delay 
> restart-strategy.fixed-delay.attempts: 1000 
> restart-strategy.fixed-delay.delay: 5 s 
> zookeeper.sasl.disable: true 
> blob.service.cleanup.interval: 60 
> 
> And I launch a job with this command : bin/flink run -d myjar.jar 
> 
> I added as an attachment a graph of my job when it works (Graph.PNG). 
> 
> Do you have an idea of the problem ? 
> 
> Thanks. 
> Julien 
> 
> 
>  



Re: Wondering how to check if taskmanager is registered with the jobmanager, without asking job manager

2018-10-10 Thread Piotr Nowojski
You’re welcome :)

> On 10 Oct 2018, at 10:28, Bariša  wrote:
> 
> Thnx Piotr. I agree, that would work. It's a bit chicken and the egg problem, 
> since at that point we can't just spin up a task manager, and have it 
> register itself, we need to have flinkmanager know how many task managers 
> should be there. Bit more logic, but doable. Thnx for the tip.
> 
> Cheers,
> Barisa
> 
> On Wed, 10 Oct 2018 at 09:05, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I don’t think that’s exposed on the TaskManager.
> 
> Maybe it would simplify things a bit if you implement this as a single 
> “JobManager” health check, not multiple TaskManagers health check - for 
> example verify that there are expected number of registered TaskManagers. It 
> might cover your case.
> 
> Piotrek
> 
>> On 9 Oct 2018, at 12:21, Bariša > <mailto:barisa.obrado...@gmail.com>> wrote:
>> 
>> As part of deploying task managers and job managers, I'd like to expose 
>> healthcheck on both task managers and job managers.
>> 
>> For the task managers, one of the requirements that they are healthy, is 
>> that they have successfully registered themselves with the job manager.
>> 
>> Is there a way to achieve this, without making a call to job manager ( to do 
>> that, I first need to make a call to the zookeeper to find the job manager, 
>> so I'm trying to simplify the health check ).
>> 
>> Ideally, taskmanager would have a metric that says, ( am registered ), but 
>> afaik, that doesn't exist 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster>
>> 
>> 
>> P.S.
>> This is my first post in the email list, happy to update/change my question, 
>> if I messed up, or misunderstood something.
>> 
>> Cheers,
>> Barisa
> 



Re: Live configuration change

2018-11-06 Thread Piotr Nowojski
Hi,

Sorry but none that I’m aware of. As far as I know, the only way to dynamically 
configure Kafka source would be for you to copy and modify it’s code.

Piotrek

> On 6 Nov 2018, at 15:19, Ning Shi  wrote:
> 
> In the job I'm implementing, there are a couple of configuration
> variables that I wnat to change at runtime, such as rate limit at the
> Kafka source. I know it's possible to use a control stream and join it
> with the normal stream to configure things in certain operators, but
> this doesn't work for the source. Is there any other way to configure
> settings at runtime?
> 
> Thanks,
> 
> --
> Ning



Re: Understanding checkpoint behavior

2018-11-06 Thread Piotr Nowojski
Hi,

Checkpoint duration sync, that’s only the time taken for the “synchronous” part 
of taking a snapshot of your operator. Your 11m time probably comes from the 
fact that before this snapshot, checkpoint barrier was stuck somewhere in your 
pipeline for that amount of time processing some record or bunch of records.

If you write a simple function that only performs `Thread.sleep(new 
Random().randomInt(360))` and nothing else, your checkpoints will be taking 
random amount of time, since snapshots can not be taken while your function is 
also executing some code. You can read about some of those concepts in the 
documentation

https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html

Piotrek

Btw, Flink 1.2.1 is very old and not supported anymore version. One reason to 
upgrade are improvements in the network stack in Flink 1.5.x, which were in 
part aiming to reduce checkpoint duration.

> On 5 Nov 2018, at 21:33, PranjalChauhan  wrote:
> 
> Hi,
> 
> I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
> understand how checkpoints actually work when Window operator is processing
> events.
> 
> My pipeline has the following flow where each operator's parallelism is 1.
> source -> flatmap -> tumbling window -> sink
> In this pipeline, I had configured the window to be evaluated every 1 hour
> (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
> timeout was set to 1 hour as I wanted the checkpoints to complete.
> 
> In my window function, the job makes https call to another service so window
> function may take some time to evaluate/process all events.
> 
> Please refer the following image. In this case, the window was triggered at
> 23:00:00. Checkpoint 12 was triggered soon after that and I notice that
> checkpoint 12 takes long time to complete (compared to other checkpoints
> when window function is not processing events).
> 
>  
> 
> Following images shows checkpoint 12 details of window & sink operators.
> 
>  
> 
>  
> 
> I see that the time spent for checkpoint was actually just 5 ms & 8 ms
> (checkpoint duration sync) for window & sink operators. However, End to End
> Duration for checkpoint was 11m 12s for both window & sink operator.
> 
> Is this expected behavior? If yes, do you have any suggestion to reduce the
> end to end checkpoint duration?
> 
> Please let me know if any more information is needed.
> 
> Thanks.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-06 Thread Piotr Nowojski
Hi,

You might come up with some magical self join that could do the trick - 
join/window join the the aggregation result with self and then aggregate it 
again. I don’t know if that’s possible (probably you would need to write custom 
aggregate function) and would be inefficient. It will be easier to convert 
result of your SQL query into a DataStream and process it with a simple/custom 
DataStream operator.

Piotrek

> On 5 Nov 2018, at 10:17, yinhua.dai  wrote:
> 
> We have a requirement that always want to trigger a calculation on a timer
> basis e.g. every 1 minute.
> 
> *If there are records come in flink during the time window then calculate it
> with the normal way, i.e. aggregate for each record and getResult() at end
> of the time window.*
> 
> *If there are no records come in flink during the time window, then send the
> last calculated result.*
> 
> I know that Flink will not trigger the calculation in the second case(when
> no records come in the system during the time window), if there a solution
> for me in Flink SQL?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-07 Thread Piotr Nowojski
Hi,

You would have to register timers (probably based on event time).

Your operator would be a vastly simplified window operator, where for given 
window you keep emitted record from your SQL, sth like:

MapState emittedRecords; // map window start -> emitted 
record

When you process elements, you just put them into this map. To emit the 
results, you just register event time timers and when a timer fires, you search 
in the map for the latest record matching the timer's event time (there might 
be many elements in the map, some of them older some of them newer then the 
fired timer). You can/should also prune the state in the same timer - for 
example after emitting the result drop all of the windows older then the timer.

Piotrek

> On 7 Nov 2018, at 02:55, yinhua.dai  wrote:
> 
> Hi Piotr,
> 
> Can you elaborate more on the solution with the custom operator?
> I don't think there will be any records from the SQL query if no input data
> in coming in within the time window even if we convert the result to a
> datastream.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread Piotr Nowojski
Re-adding user mailing list to CC

Hi,

> I basically understand your meaning, as far as my understanding, we can write 
> a custom window assigner and custom trigger, and we can register the timer 
> when the window process elements. 


No I was actually suggesting to write your own operator to do that. My bet is 
that hacking window operator to make it re-emit the same result in case of no 
data would be more difficult if not even impossible, while your custom 
“ReEmitLastRow” operator should be relatively simple.

> But How can we register a timer when no elements received during a time 
> window? 

Upon first element register timer for N seconds in the future. Once it fires, 
register next one (you can do that while processing a timer callback) again for 
N seconds in the future and so on.

Piotrek

> On 8 Nov 2018, at 07:44, yinhua.2...@outlook.com wrote:
> 
> Hi Piotr, 
> 
> Thank you for your explanation. 
> I basically understand your meaning, as far as my understanding, we can write 
> a custom window assigner and custom trigger, and we can register the timer 
> when the window process elements. 
> 
> But How can we register a timer when no elements received during a time 
> window? 
> My requirement is to always fire at end of the time window even no result 
> from the sql query.



> On 7 Nov 2018, at 09:48, Piotr Nowojski  wrote:
> 
> Hi,
> 
> You would have to register timers (probably based on event time).
> 
> Your operator would be a vastly simplified window operator, where for given 
> window you keep emitted record from your SQL, sth like:
> 
> MapState emittedRecords; // map window start -> emitted 
> record
> 
> When you process elements, you just put them into this map. To emit the 
> results, you just register event time timers and when a timer fires, you 
> search in the map for the latest record matching the timer's event time 
> (there might be many elements in the map, some of them older some of them 
> newer then the fired timer). You can/should also prune the state in the same 
> timer - for example after emitting the result drop all of the windows older 
> then the timer.
> 
> Piotrek
> 
>> On 7 Nov 2018, at 02:55, yinhua.dai  wrote:
>> 
>> Hi Piotr,
>> 
>> Can you elaborate more on the solution with the custom operator?
>> I don't think there will be any records from the SQL query if no input data
>> in coming in within the time window even if we convert the result to a
>> datastream.
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: Understanding checkpoint behavior

2018-11-08 Thread Piotr Nowojski
Hi,

> On 6 Nov 2018, at 18:22, PranjalChauhan  wrote:
> 
> Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
> next year.
> 
> Two follow-up questions for now. 
> 
> 1. 
> " When operator snapshots are taken, there are two parts: the synchronous
> and the asynchronous parts. "
> I understand that when the operator snapshot is being taken, the processing
> of that operator is stopped as taking this snapshot is synchronous part. Is
> there any other synchronous part in the snapshot / checkpoint process?
> 

Not as far as I know.

> 
> 2. 
> Based on the test I mentioned above, my understanding is that for a window
> operator, when all events that belongs to checkpoint N and the checkpoint
> barrier N are received by window operator (but pending for window to be
> triggered), then checkpoint barrier N will be immediately emitted to the
> sink operator (so snapshot can be completed) while the events are still
> pending to be evaluated by window operator.
> 
> Can you please confirm my understanding as I was initially confused by the
> following second statement (emits all pending outgoing records) under
> Barriers section in this doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html
> ?
> 
> "When an intermediate operator has received a barrier for snapshot n from
> all of its input streams, it emits itself a barrier for snapshot n into all
> of its outgoing streams."
> 
> " Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself. “

I think you might be mixing two different concepts, watermarks and checkpoint 
barriers. The documentation that you are quoting describes checkpointing 
mechanism, checkpoint barriers and records alignment. Checkpoint barrier do not 
cause any results to be emitted from WindowOperator, this happens when timers 
are triggered (wall clock timers in case of processing time or watermarks in 
case of event time). 

Piotrek

Re: Call batch job in streaming context?

2018-11-23 Thread Piotr Nowojski
Hi,

I’m not sure if I understand your problem and your context, but spawning a 
batch job every 45 seconds doesn’t sound as a that bad idea (as long as the job 
is short).

Another idea would be to incorporate this batch job inside your streaming job, 
for example by reading from Cassandra using an AsyncIO operator:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html

Quick google search revealed for example this:

https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink

Piotrek 

> On 23 Nov 2018, at 10:33, eric hoffmann  wrote:
> 
> Hi
> Is it possible to call batch job on a streaming context?
> what i want to do is:
> for a given input event, fetch cassandra elements based on event data, apply 
> transformation on them and apply a ranking when all elements fetched by 
> cassandra are processed.
> If i do this in batch mode i would have to submit a job on each events and i 
> can have an event every 45 seconds.
> Is there any alternative? can i start a batch job that will receive some 
> external request, process it and wait for another request?
> thx
> Eric



Re: error while joining two datastream

2018-11-23 Thread Piotr Nowojski
Hi,

I assume that 

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

Actually prints what you have expected? 

If so, the problem might be that:
a) time/watermarks are not progressing (watermarks are triggering the output of 
your `TumblingEventTimeWindows.of(Time.seconds(15))`)
b) data are not being joined, because:
  - there are no matching elements (based on your KeySelectors) to join with 
between those two streams
  - elements are out of sync with respect to window length (within your 15 
second tumbling window, there are no elements to join)
c) streams are producing different event times/watermarks (for example one is 
far ahead of the other). Windowed join will produce result only once their’s 
both watermarks catch up/sync up.
  
Piotrek 

> On 23 Nov 2018, at 08:50, Abhijeet Kumar  wrote:
> 
> DataStream String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
>   .assignTimestampsAndWatermarks(
>   new 
> BoundedOutOfOrdernessTimestampExtractor String, String, String, String, String, String, String, Long>>(
>   
> Time.seconds(Integer.parseInt(parameters.get("watermarkTime" {
> 
>   /**
>* 
>*/
>   private 
> static final long serialVersionUID = 1L;
> 
>   @Override
>   public long 
> extractTimestamp(
>   
> Tuple11 String, String, Long> element) {
>   return 
> element.f10;
>   }
>   });
> 
>   DataStream String, Long>> withTimestampsAndWatermarks2 = formatStream2
>   .assignTimestampsAndWatermarks(
>   new 
> BoundedOutOfOrdernessTimestampExtractor String, String, String, Long>>(
>   
> Time.seconds(Integer.parseInt(parameters.get("watermarkTime" {
> 
>   /**
>* 
>*/
>   private 
> static final long serialVersionUID = 1L;
> 
>   @Override
>   public long 
> extractTimestamp(
>   
> Tuple7 element) {
>   return 
> element.f6;
>   }
>   });
>   
>   withTimestampsAndWatermarks1.print();
>   withTimestampsAndWatermarks2.print();
>   
>   DataStream< Tuple17 String, String, String, String, String, String, String, String, String, 
> String, Long, Long>> joined = withTimestampsAndWatermarks1
>   .join(withTimestampsAndWatermarks2)
>   .where(new KeySelector String, String, String, String, String, String, String, String, Long>, 
> String>() {
>   /**
>* 
>*/
>   private static final long 
> serialVersionUID = 1L;
> 
>   public String getKey(
>   Tuple11 String, String, String, String, String, String, String, String, Long> t1)
>   throws Exception {
>   return t1.f0;
>   }
>   }).equalTo(new KeySelector String, String, String, String, String, Long>, String>() {
>   /**
>* 
>*/
>   private static final long 
> serialVersionUID = 1L;
> 
>   public String getKey(Tuple7 String, String, String, String, String, Long> t1)
>   throws Exception {
>   

Re: [flink-cep] Flick CEP support for the group By operator

2018-11-23 Thread Piotr Nowojski
Hi,

Yes, sure. Just use CEP on top of KeyedStream. Take a look at `keyBy`:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/

Piotrek 

> On 23 Nov 2018, at 16:04, Spico Florin  wrote:
> 
> Hello!
> 
> I'm using Flink 1.4.2 and I would like to use a group by operator based on 
> value of my event stream. The functionality that I would like to achieve is 
> similar to the following Esper EPL
> (excerpt 
> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>  
> )
> 
> 
> select symbol, tickDataFeed, median(volume) 
> from StockTickEvent.win:time(30 sec) 
> group by symbol, tickDataFeed
>  <>
> 
> So, does the Flick CEP  support such a group by functionality? 
> 
> If yes what is syntax?
> 
> 
> 
> I look forward for your answers.
> 
> 
> 
> Best regards,
> 
>  Florin 
> 
> 
> 
> 
> 
> (excerpt 
> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>  
> )
> "You can list more then one expression in the group by clause to nest groups. 
> Once the sets are established with group by the aggregation functions are 
> applied. This statement posts the median volume for all stock tick events in 
> the last 30 seconds per symbol and tick data feed. Esper posts one event for 
> each group to statement listeners:
> In the statement above the event properties in the select list (symbol, 
> tickDataFeed) are also listed in the group by clause. The statement thus 
> follows the SQL standard which prescribes that non-aggregated event 
> properties in the select list must match the group by columns."
> 



Re: [flink-cep] Flick CEP support for the group By operator

2018-11-25 Thread Piotr Nowojski
Hey,

As a matter of fact, you do not need a Flink's CEP library to run the same 
query. The same functionality can be achieved by simple tumbling window with a 
“median” aggregate (“median" you would have to implement by your self).

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Piotrek

> On 23 Nov 2018, at 16:32, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Yes, sure. Just use CEP on top of KeyedStream. Take a look at `keyBy`:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/>
> 
> Piotrek 
> 
>> On 23 Nov 2018, at 16:04, Spico Florin > <mailto:spicoflo...@gmail.com>> wrote:
>> 
>> Hello!
>> 
>> I'm using Flink 1.4.2 and I would like to use a group by operator based on 
>> value of my event stream. The functionality that I would like to achieve is 
>> similar to the following Esper EPL
>> (excerpt 
>> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>>  
>> <http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating>)
>> 
>> 
>> select symbol, tickDataFeed, median(volume) 
>> from StockTickEvent.win <http://stocktickevent.win/>:time(30 sec) 
>> group by symbol, tickDataFeed
>>  <>
>> 
>> So, does the Flick CEP  support such a group by functionality? 
>> 
>> If yes what is syntax?
>> 
>> 
>> 
>> I look forward for your answers.
>> 
>> 
>> 
>> Best regards,
>> 
>>  Florin 
>> 
>> 
>> 
>> 
>> 
>> (excerpt 
>> http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating
>>  
>> <http://esper.espertech.com/release-5.2.0/esper-reference/html/epl_clauses.html#epl-grouping-aggregating>)
>> "You can list more then one expression in the group by clause to nest 
>> groups. Once the sets are established with group by the aggregation 
>> functions are applied. This statement posts the median volume for all stock 
>> tick events in the last 30 seconds per symbol and tick data feed. Esper 
>> posts one event for each group to statement listeners:
>> In the statement above the event properties in the select list (symbol, 
>> tickDataFeed) are also listed in the group by clause. The statement thus 
>> follows the SQL standard which prescribes that non-aggregated event 
>> properties in the select list must match the group by columns."
>> 
> 



Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-11-29 Thread Piotr Nowojski
Hi Nastaran,

When you are checking for duplicated messages, are you reading from kafka using 
`read_commited` mode (this is not the default value)?

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme

> Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once 
> semantic. Whenever you write to Kafka using
> transactions, do not forget about setting desired isolation.level 
> (read_committed or read_uncommitted - the latter one is the
> default value) for any application consuming records from Kafka.

Does the problem happens always?

Piotrek

> On 28 Nov 2018, at 08:56, Nastaran Motavali  wrote:
> 
> Hi,
> I have a flink streaming job implemented via java which reads some messages 
> from a kafka topic, transforms them and finally sends them to another kafka 
> topic.
> The version of flink is 1.6.2 and the kafka version is 011. I pass the 
> Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I 
> cancel the job with savepoint and then restart it using the saved savepoint, 
> I have duplicated messages in the sink.
> Do I miss some kafka/flink configurations to avoid duplication?
> 
> 
> Kind regards,
> Nastaran Motavalli



Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Piotr Nowojski
Great, thanks for coming back and I'm glad that it works for you!

Piotrek

czw., 8 lip 2021 o 13:34 Yik San Chan 
napisał(a):

> Hi Piotr,
>
> Thanks! I end up doing option 1, and that works great.
>
> Best,
> Yik San
>
> On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> You could always buffer records in your sink function/operator, until a
>> large enough batch is accumulated and upload the whole batch at once. Note
>> that if you want to have at-least-once or exactly-once semantics, you would
>> need to take care of those buffered records in one way or another. For
>> example you could:
>> 1. Buffer records on some in memory data structure (not Flink's state),
>> and just make sure that those records are flushed to the underlying sink on
>> `CheckpointedFunction#snapshotState()` calls
>> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap
>> state backend would be the fastest with little overhead, but you can risk
>> running out of memory), and that would easily give you exactly-once. That
>> way your batch could span multiple checkpoints.
>> 3. Buffer/write records to temporary files, but in that case keep in mind
>> that those files need to be persisted and recovered in case of failure and
>> restart.
>> 4. Ignore checkpointing and either always restart the job from scratch or
>> accept some occasional data loss.
>>
>> FYI, virtually every connector/sink is internally batching writes to some
>> extent. Usually by doing option 1.
>>
>> Piotrek
>>
>> wt., 25 maj 2021 o 14:50 Yik San Chan 
>> napisał(a):
>>
>>> Hi community,
>>>
>>> I have a Hive table that stores tens of millions rows of data. In my
>>> Flink job, I want to process the data in batch manner:
>>>
>>> - Split the data into batches, each batch has (maybe) 10,000 rows.
>>> - For each batch, call a batchPut() API on my redis client to dump in
>>> Redis.
>>>
>>> Doing so in a streaming manner is not expected, as that will cause too
>>> many round trips between Flink workers and Redis.
>>>
>>> Is there a way to do that? I find little clue in Flink docs, since
>>> almost all APIs feel better suited for streaming processing by default.
>>>
>>> Thank you!
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: How to register custormize serializer for flink kafka format type

2021-07-13 Thread Piotr Nowojski
Hi,

It's mentioned in the docs [1], but unfortunately this is not very well
documented in 1.10. In short you have to provide a custom implementation of
a `DeserializationSchemaFactory`. Please look at the built-in factories for
examples of how it can be done.

In newer versions it's both easier and better documented. For example in
1.13 please take a look at `DeserializationFormatFactory` and [2]

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/#factories

czw., 8 lip 2021 o 14:21 Chenzhiyuan(HR) 
napisał(a):

> I create table as below, and the data is from kafka.
>
> I want to deserialize the json message to Pojo object.
>
> But the message format is not avro or simple json.
>
> *So I need to know how to register custormized serializer and use it for
> the 'format.type' property.*
>
> By the way, my flink version is 1.10.0.
>
> CREATE TABLE MyUserTable(
>
> uuid VARCHAR,
>
> orgId VARCHAR
>
> ) with (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'topic_name',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'connector.properties.group.id' = 'testGroup',
>
> 'format.type' = 'cutormizeSerializer'
>
> )
>
> The kafka message body sample, each columnName is the key for Pojo object,
> and rawData is value:
>
> {
>
>"beforeData": [],
>
> "byteSize": 272,
>
> "columnNumber": 32,
>
> "data": [{
>
> "byteSize": 8,
>
> "columnName": "APPLY_PERSON_ID",
>
> "rawData": 10017,
>
> "type": "LONG"
>
> }, {
>
> "byteSize": 12,
>
> "columnName": "UPDATE_SALARY",
>
> "rawData": "11000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 11,
>
> "columnName": "UP_AMOUNT",
>
> "rawData": "1000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 3,
>
> "columnName": "CURRENCY",
>
> "rawData": "CNY",
>
> "type": "STRING"
>
> }, {
>
> "byteSize": 32,
>
> "columnName": "EXCHANGE_RATE",
>
> "rawData": "1.00",
>
> "type": "DOUBLE"
>
> },  {
>
> "byteSize": 11,
>
> "columnName": "DEDUCTED_ACCOUNT",
>
> "rawData": "1000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 1,
>
> "columnName": "ENTER_AT_PROCESS",
>
> "rawData": "Y",
>
> "type": "STRING"
>
> }],
>
> "dataCount": 0,
>
> "dataMetaData": {
>
> "connector": "mysql",
>
> "pos": 1000368076,
>
> "row": 0,
>
> "ts_ms": 1625565737000,
>
> "snapshot": "false",
>
> "db": "testdb",
>
> "table": "flow_person_t"
>
> },
>
> "key": "APPLY_PERSON_ID",
>
> "memorySize": 1120,
>
> "operation": "insert",
>
> "rowIndex": -1,
>
> "timestamp": "1970-01-01 00:00:00"
>
> }
>
> The Pojo object as below:
>
> import lombok.Data;
>
>
>
> @Data
>
> public class HrSalaryPersonVO {
>
> private String uuid;
>
> private String orgId;
>
> private String unitId;
>
> private String effectiveDate;
>
>
>
> private int adjustPersonCount;
>
>
>
> private Double adjustAmount;
>
>
>
> private Double beforeSalaryAmount;
>
> private Double adjustRate;
>
>
>
> private String data0prateType;
>
>
>
> private String status;
>
> }
>
>
>


Re: Kafka Consumer Retries Failing

2021-07-13 Thread Piotr Nowojski
Hi,

I'm not sure, maybe someone will be able to help you, but it sounds like it
would be better for you to:
- google search something like "Kafka Error sending fetch request
TimeoutException" (I see there are quite a lot of results, some of them
might be related)
- ask this question on the Kafka mailing list
- ask this question on stackoverflow as a Kafka question

In short, FlinkKafkaConsumer is a very thin wrapper around the
KafkaConsumer class, so the thing you are observing has most likely very
little to do with the Flink itself. In other words, if you are observing
such a problem you most likely would be possible to reproduce it without
Flink.

Best,
Piotrek

pt., 9 lip 2021 o 12:30 Rahul Patwari 
napisał(a):

> Hi,
>
> We have a Flink 1.11.1 Version streaming pipeline in production which
> reads from Kafka.
> Kafka Server version is 2.5.0 - confluent 5.5.0
> Kafka Client Version is 2.4.1 - 
> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
> version: 2.4.1","method":""}
>
> Occasionally(every 6 to 12 hours), we have observed that the Kafka
> consumption rate went down(NOT 0) and the following logs were observed:
> Generally, the consumption rate across all consumers is 4k records/sec.
> When this issue occurred, the consumption rate dropped to < 50 records/sec
>
> org.apache.kafka.common.errors.DisconnectException: null
>
> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
> fetch request (sessionId=405798138, epoch=5808) to node 8:
> {}.","method":"handleError"}
>
> org.apache.kafka.common.errors.TimeoutException: Failed
>
> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid,
> will attempt rediscovery","method":"markCoordinatorUnknown"}
>
> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
> null)","method":"onSuccess"}
>
> The consumers retried for more than an hour but the above logs are
> observed again.
> The consumers started pulling data after a manual restart.
>
> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
> period.
>
> Our observation from this incident is that Kafka Consumer retries could
> not resolve the issue but a manual restart (or) Flink internal
> restart(Failure rate restart policy) does.
>
> Has anyone faced this issue before? Any pointers are appreciated.
>
> Regards,
> Rahul
>


Re: Process finite stream and notify upon completion

2021-07-13 Thread Piotr Nowojski
Hi,

Sources when finishing are emitting
{{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I
think the best approach is to register an even time timer for
{{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If
your function registers such a timer, it would be processed after
processing all of the records by that function (keep in mind Flink is a
distributed system so downstream operators/functions might still be busy
for some time processing last records, while upstream operators/functions
are already finished).

Alternatively you can also implement a custom operator that implements
{{BoundedOneInput}} interface [1], it would work in the same way, but
implementing a custom operator is more difficult, only semi officially
supported and not well documented.

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html

pon., 12 lip 2021 o 12:44 Tamir Sagi 
napisał(a):

> Hey Community,
>
> I'm working on a stream job that should aggregate a bounded data and
> notify upon completion. (It works in Batch mode; however, I'm trying to
> achieve the same results in Stream mode, if possible).
>
> Source: Kafka
> Sink: PostgresDB
>
> *I'm looking for an elegant way to notify upon completion.*
>
> One solution I have in mind (Not perfect but might work)
>
>1. Send message to topic for every record which successfully saved
>into DB (From sink)
>2. Consume those messages externally to cluster
>3. If message is not consumed for fixed time, we assume the process
>has finished.
>
> I was also wondering if TimeEventWindow with custom trigger and
> AggregationFunction may help me here
> However, I could not find a way to detect when all records have been
> processed within the window.
>
> I'd go with Flink base solution if exists.
>
> Various References
> flink-append-an-event-to-the-end-of-finite-datastream
> 
> how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic
> 
>
> Best,
>
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Kafka Consumer Retries Failing

2021-07-14 Thread Piotr Nowojski
Hi,

Waiting for memory from LocalBufferPool is a perfectly normal symptom of a
backpressure [1][2].

Best,
Piotrek

[1] https://flink.apache.org/2021/07/07/backpressure.html
[2] https://www.ververica.com/blog/how-flink-handles-backpressure

śr., 14 lip 2021 o 06:05 Rahul Patwari 
napisał(a):

> Thanks, David, Piotr for your reply.
>
> I managed to capture the Thread dump from Jobmanaager UI for few task
> managers.
> Here is the thread dump for Kafka Source tasks in one task manager. I
> could see the same stack trace in other task managers as well. It seems
> like Kafka Source tasks are waiting on Memory. Any Pointers?
>
>   {
> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
> SourceEventTransition (6/12)\" Id=581 WAITING on 
> java.lang.Object@444c0edc\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@444c0edc\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
> }, {
> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature
> (7/12)\" Id=580 WAITING on java.lang.Object@7d3843a9\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@7d3843a9\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventSignature (7/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventSignature (7/12)\" Id=408 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventTransition
> (6/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventTransition (6/12)\" Id=409 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
> }
>
> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I'm not sure, maybe someone will be able to help you, but it sounds like
>> it would be better for you to:
>> - google search something like "Kafka Error sending fetch request
>> TimeoutException" (I see there are quite a lot of results, some of them
>> might be related)
>> - ask this question on the Kafka mailing list
>> - ask this question on stackoverflow as a Kafka question
>>
>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>> KafkaConsumer class, so the thing you are observing has most likely very
>> little to do with the Flink itself. In other 

Re: Process finite stream and notify upon completion

2021-07-14 Thread Piotr Nowojski
Hi Tamir,

Sorry I missed that you want to use Kafka. In that case I would suggest
trying out the new KafkaSource [1] interface and it's built-in boundness
support [2][3]. Maybe it will do the trick? If you want to be notified
explicitly about the completion of such a bounded Kafka stream, you still
can use this `Watermark#MAX_WATERMARK` trick mentioned above.

If not, can you let us know what is not working?

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-


śr., 14 lip 2021 o 11:59 Tamir Sagi 
napisał(a):

> Hey Piotr,
>
> Thank you for your response.
>
> I saw the exact suggestion answer by David Anderson [1] but did not really
> understand how it may help.
>
> Sources when finishing are emitting
> {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
>
> Assuming 10 messages are sent to Kafka topic , processed and saved into DB
>
>1. Kafka is not considered a finite source, after the 10th element it
>will wait for more input, no?
>2. In such case, the 10th element will be marked with MAX_WATERMARK or
>not? or at some point in the future?
>
> Now, Let's say the 10th element will be marked with MAX_WATERMARK, How
> will I know when all elements have been saved into DB?
>
> Here is the execution Graph
> Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)
>
> Would you please elaborate about the time event function? where exactly
> will it be integrated into the aforementioned execution graph ?
>
> Another question I have, based on our discussion. If the only thing that
> changed is the source, apart from that the entire flow is the
> same(operators and sink);  is there any good practice to achieve a single
> job for that?
>
> Tamir.
>
> [1]
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> --
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, July 13, 2021 4:54 PM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Process finite stream and notify upon completion
>
>
> *EXTERNAL EMAIL*
>
>
> Hi,
>
> Sources when finishing are emitting
> {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I
> think the best approach is to register an even time timer for
> {{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If
> your function registers such a timer, it would be processed after
> processing all of the records by that function (keep in mind Flink is a
> distributed system so downstream operators/functions might still be busy
> for some time processing last records, while upstream operators/functions
> are already finished).
>
> Alternatively you can also implement a custom operator that implements
> {{BoundedOneInput}} interface [1], it would work in the same way, but
> implementing a custom operator is more difficult, only semi officially
> supported and not well documented.
>
> Best,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html
>
> pon., 12 lip 2021 o 12:44 Tamir Sagi 
> napisał(a):
>
> Hey Community,
>
> I'm working on a stream job that should aggregate a bounded data and
> notify upon completion. (It works in Batch mode; however, I'm trying to
> achieve the same results in Stream mode, if possible).
>
> Source: Kafka
> Sink: PostgresDB
>
> *I'm looking for an elegant way to notify upon completion.*
>
> One solution I have in mind (Not perfect but might work)
>
>1. Send message to topic for every record which successfully saved
>into DB (From sink)
>2. Consume those messages externally to cluster
>3. If message is not consumed for fixed time, we assume the process
>has finished.
>
> I was also wondering if TimeEventWindow with custom trigger and
> AggregationFunction may help me here
> However, I could not find a way to detect when all records have been
> processed within the window.
>
> I'd go with Flink base solution if exists.
>
> Various References
> flink-append-an-event-to-the-end-of-finite-datastream
> <https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302>
> how-can-

Re: Kafka Consumer Retries Failing

2021-07-14 Thread Piotr Nowojski
: "\"Legacy Source Thread - Source:
> SourceEventSignature (8/12)\" Id=515 WAITING on 
> java.lang.Object@4d5cc800\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@4d5cc800\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventTransition
> (4/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventTransition (4/12)\" Id=514 WAITING on 
> java.lang.Object@1fc525f3\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@1fc525f3\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
> }
>
> On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Waiting for memory from LocalBufferPool is a perfectly normal symptom of
>> a backpressure [1][2].
>>
>> Best,
>> Piotrek
>>
>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>>
>> śr., 14 lip 2021 o 06:05 Rahul Patwari 
>> napisał(a):
>>
>>> Thanks, David, Piotr for your reply.
>>>
>>> I managed to capture the Thread dump from Jobmanaager UI for few task
>>> managers.
>>> Here is the thread dump for Kafka Source tasks in one task manager. I
>>> could see the same stack trace in other task managers as well. It seems
>>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>>
>>>   {
>>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>>> java.lang.Object@444c0edc\n\tat
>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>> java.lang.Object@444c0edc\n\tat
>>> java.lang.Object.wait(Object.java:502)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>> }, {
>>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>>> java.lang.Object@7d3843a9\n\tat
>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>> java.lang.Object@7d3843a9\n\tat
>>> java.lang.Object.wait(Object.java:502)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>> }, {
>>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>>> (7/12)",
>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>> SourceEventSignature (7/12)\" Id=408 WAITING on
>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>> java.util.concurrent.

Re: Process finite stream and notify upon completion

2021-07-14 Thread Piotr Nowojski
r things, this is the case when you do time
> > series analysis, when doing aggregations based on certain time periods
> > (typically called windows), or when you do event processing where the
> > time when an event occurred is important.
> > ci.apache.org
> >
> >
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK
> >
> >
> >
> > 
> > *From:* Piotr Nowojski 
> > *Sent:* Wednesday, July 14, 2021 1:36 PM
> > *To:* Tamir Sagi 
> > *Cc:* user@flink.apache.org 
> > *Subject:* Re: Process finite stream and notify upon completion
> >
> > *EXTERNAL EMAIL*
> >
> >
> >
> > Hi Tamir,
> >
> > Sorry I missed that you want to use Kafka. In that case I would suggest
> > trying out the new KafkaSource [1] interface and it's built-in boundness
> > support [2][3]. Maybe it will do the trick? If you want to be notified
> > explicitly about the completion of such a bounded Kafka stream, you
> > still can use this `Watermark#MAX_WATERMARK` trick mentioned above.
> >
> > If not, can you let us know what is not working?
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
> >
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
> >
> > [3]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-
> >
> >
> >
> > śr., 14 lip 2021 o 11:59 Tamir Sagi  > <mailto:tamir.s...@niceactimize.com>> napisał(a):
> >
> > Hey Piotr,
> >
> > Thank you for your response.
> >
> > I saw the exact suggestion answer by David Anderson [1] but did not
> > really understand how it may help.
> >
> > Sources when finishing are emitting
> >
>  {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
> >
> > Assuming 10 messages are sent to Kafka topic , processed and saved
> > into DB
> >
> >  1. Kafka is not considered a finite source, after the 10th element
> > it will wait for more input, no?
> >  2. In such case, the 10th element will be marked with MAX_WATERMARK
> > or not? or at some point in the future?
> >
> > Now, Let's say the 10th element will be marked with MAX_WATERMARK,
> > How will I know when all elements have been saved into DB?
> >
> > Here is the execution Graph
> > Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)
> >
> > Would you please elaborate about the time event function? where
> > exactly will it be integrated into the aforementioned execution
> graph ?
> >
> > Another question I have, based on our discussion. If the only thing
> > that changed is the source, apart from that the entire flow is the
> > same(operators and sink);  is there any good practice to achieve a
> > single job for that?
> >
> > Tamir.
> >
> > [1]
> >
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> > <
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> >
> >
>  
> > *From:* Piotr Nowojski  > <mailto:pnowoj...@apache.org>>
> > *Sent:* Tuesday, July 13, 2021 4:54 PM
> > *To:* Tamir Sagi  > <mailto:tamir.s...@niceactimize.com>>
>

Re: Kafka Consumer Retries Failing

2021-07-19 Thread Piotr Nowojski
Thanks for the update.

> Could the backpressure timeout and heartbeat timeout be because of Heap
Usage close to Max configured?

Could be. This is one of the things I had in mind under overloaded in:

> might be related to one another via some different deeper problem (broken
network environment, something being overloaded)

You can easily diagnose it. Just attach a memory profiler or check gc logs,
just as you would normally do when debugging a non-Flink standalone Java
application.

It can also be a symptom of a failing network environment. I would first
check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
caused those RPC timeouts. If that doesn't bring you closer to a solution I
would then check for the network environment in your cluster/cloud. Both of
those might be a reason behind your Kafka issues. Hard to tell. Definitely
you shouldn't have heartbeat timeouts in your cluster, so something IS
wrong with your setup.

Best,
Piotrek

czw., 15 lip 2021 o 17:17 Rahul Patwari 
napisał(a):

> Thanks for the feedback Piotrek.
>
> We have observed the issue again today. As we are using Flink 1.11.1, I
> tried to check the backpressure of Kafka source tasks from the
> Jobmanager UI.
> The backpressure request was canceled due to Timeout and "No Data" was
> displayed in UI. Here are the respective logs:
>
> java.util.concurrent.TimeoutException: Invocation of public abstract
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
> timed out.
> at
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
> .
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
> after [15000 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> .
>
> During this time, the heartbeat of one of the Taskmanager to the
> Jobmanager timed out. Here are the respective logs:
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> bead57c15b447eac08531693ec91edc4 timed out. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
> ..
>
> Because of heartbeat timeout, there was an internal restart of Flink and
> the Kafka consumption rate recovered after the restart.
>
> Could the backpressure timeout and heartbeat timeout be because of Heap
> Usage close to Max configured?
>
> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
> wrote:
>
>> Hi Rahul,
>>
>> I would highly doubt that you are hitting the network bottleneck case. It
>> would require either a broken environment/network or throughputs in orders
>> of GB/second. More likely you are seeing empty input pool and you haven't
>> checked the documentation [1]:
>>
>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>> LocalInputChannels)
>>
>> If local channels are backpressured, inPoolUsage will be 0. You can check
>> downstream task's inputQueueLength or isBackPressured metrics. Besides
>> that, I would highly recommend upgrading to Flink 1.13.x if you are
>> investigating backpressure problems as described in the blog post.
>>
>> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
>> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>
>> No, I don't think it's possible. Those two might be related to one
>> another via some different deeper problem (broken network environment,
>> something being overloaded), but I don't see a way how one could cause the
>> other.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>>
>> śr., 14 lip 2021 o 14:18 Rahul Patwari 
>> napisał(a):
>>
>>> Thanks, Piotrek.
>>>
>>> We have two Kafka sources. We are facing this issue for both of them.
>>> The downstream tasks with the sources form two independent directed acyclic
>>> graphs, running within the same Streaming Job.
>>>
>>> For Example:
>>> source1 -> task1 -&g

Re: Kafka Consumer Retries Failing

2021-07-19 Thread Piotr Nowojski
Ok, thanks for the update. Great that you managed to resolve this issue :)

Best,
Piotrek

pon., 19 lip 2021 o 17:13 Rahul Patwari 
napisał(a):

> Hi Piotrek,
>
> I was just about to update.
> You are right. The issue is because of a stalled task manager due to High
> Heap Usage. And the High Heap Usage is because of a Memory Leak in a
> library we are using.
>
> Thanks for your help.
>
> On Mon, Jul 19, 2021 at 8:31 PM Piotr Nowojski 
> wrote:
>
>> Thanks for the update.
>>
>> > Could the backpressure timeout and heartbeat timeout be because of
>> Heap Usage close to Max configured?
>>
>> Could be. This is one of the things I had in mind under overloaded in:
>>
>> > might be related to one another via some different deeper problem
>> (broken network environment, something being overloaded)
>>
>> You can easily diagnose it. Just attach a memory profiler or check gc
>> logs, just as you would normally do when debugging a non-Flink standalone
>> Java application.
>>
>> It can also be a symptom of a failing network environment. I would first
>> check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
>> caused those RPC timeouts. If that doesn't bring you closer to a solution I
>> would then check for the network environment in your cluster/cloud. Both of
>> those might be a reason behind your Kafka issues. Hard to tell. Definitely
>> you shouldn't have heartbeat timeouts in your cluster, so something IS
>> wrong with your setup.
>>
>> Best,
>> Piotrek
>>
>> czw., 15 lip 2021 o 17:17 Rahul Patwari 
>> napisał(a):
>>
>>> Thanks for the feedback Piotrek.
>>>
>>> We have observed the issue again today. As we are using Flink 1.11.1, I
>>> tried to check the backpressure of Kafka source tasks from the
>>> Jobmanager UI.
>>> The backpressure request was canceled due to Timeout and "No Data" was
>>> displayed in UI. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Invocation of public abstract
>>> java.util.concurrent.CompletableFuture
>>> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
>>> timed out.
>>> at
>>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
>>> .
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
>>> after [15000 ms]. Message of type
>>> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
>>> reason for `AskTimeoutException` is that the recipient actor didn't send a
>>> reply.
>>> at
>>> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> .
>>>
>>> During this time, the heartbeat of one of the Taskmanager to the
>>> Jobmanager timed out. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>>> bead57c15b447eac08531693ec91edc4 timed out. at
>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
>>> ..
>>>
>>> Because of heartbeat timeout, there was an internal restart of Flink
>>> and the Kafka consumption rate recovered after the restart.
>>>
>>> Could the backpressure timeout and heartbeat timeout be because of Heap
>>> Usage close to Max configured?
>>>
>>> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> I would highly doubt that you are hitting the network bottleneck case.
>>>> It would require either a broken environment/network or throughputs in
>>>> orders of GB/second. More likely you are seeing empty input pool and you
>>>> haven't checked the documentation [1]:
>>>>
>>>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>>>> LocalInputChannels)
>>>>
>>>> If local channels are backpressured, inPoolUsage will be 0. You can
>>>> check downstream task's inputQueueLength or isBackPressured metrics.
>>>> Besides that, I would highly recommend up

Re: TaskManager crash after cancelling a job

2021-07-29 Thread Piotr Nowojski
Hi Ivan,

It sounds to me like a bug in FlinkKinesisConsumer that it's not cancelling
properly. The change in the behaviour could have been introduced as a bug
fix [1], where we had to stop interrupting the source thread. This also
might be related or at least relevant for fixing [2].

Ivan, the stack trace that you posted shows only that the task thread is
waiting for the source thread to finish. It doesn't show why the source
thread hasn't ended. For someone to fix this, it would be helpful if you
could provide a thread dump from a Task Manager that is stuck in cancelling
state. If for some reason you don't want to share a full thread dump, you
can share only the threads that have on the stack trace any package that
contains "kinesis" (this would capture the `FlinkKinesisConsumer` and all
internal kinesis threads). This should be enough for someone that is
familiar with FlinkKinesisConsumer to understand why it hasn't been
canceled.

Best, Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-21028
[2] https://issues.apache.org/jira/browse/FLINK-23528

czw., 29 lip 2021 o 04:15 Yangze Guo  napisał(a):

> In your case, the entry point is the `cleanUpInvoke` function called
> by `StreamTask#invoke`.
>
> @ro...@apache.org Could you take another look at this?
>
> Best,
> Yangze Guo
>
> On Thu, Jul 29, 2021 at 2:29 AM Ivan Yang  wrote:
> >
> > Hi Yangze,
> >
> > I deployed 1.13.1, same problem exists. It seems like that the cancel
> logic has changed since 1.11.0 (which was the one we have been running for
> almost 1 year). In 1.11.0, during the cancellation, we saw some subtask
> stays in the cancelling state for sometime, but eventually the job will be
> cancelled, and no task manager were lost. So we can start the job right
> away. In the new version 1.13.x, it will kill the task managers where those
> stuck sub tasks were running on, then takes another 4-5 minutes for the
> task manager to rejoin.  Can you point me the code that manages the job
> cancellation routine? Want to understand the logic there.
> >
> > Thanks,
> > Ivan
> >
> > > On Jul 26, 2021, at 7:22 PM, Yangze Guo  wrote:
> > >
> > > Hi, Ivan
> > >
> > > My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> > > take another look? If that is the case, you can upgrade to 1.13.1.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang 
> wrote:
> > >>
> > >> Dear Flink experts,
> > >>
> > >> We recently ran into an issue during a job cancellation after
> upgraded to 1.13. After we issue a cancel (from Flink console or flink
> cancel {jobid}), a few subtasks stuck in cancelling state. Once it gets to
> that situation, the behavior is consistent. Those “cancelling tasks will
> never become canceled. After 3 minutes, The job stopped, as a result,
> number of task manages were lost. It will take about another 5 minute for
> the those lost task manager to rejoin the Job manager. Then we can restart
> the job from the previous checkpoint. Found an exception from the hanging
> (cancelling) Task Manager.
> > >> ==
> > >>sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.lang.Thread.run(Thread.java:748)
> > >> ===
> > >>
> > >> Here are some background information about our job and setup.
> > >> 1) The job is relatively large, we have 500+ parallelism and 2000+
> subtasks. It’s mainly reading from a Kinesis stream and perform some
> transformation and fanout to multiple output s3 buckets. It’s a stateless
> ETL job.
> > >> 2) The same code and setup running on smaller environments don’t seem
> to have this cancel failure problem.
> > >> 3) We have been using Flink 1.11.0 for the same job, and never seen
> this cancel failure and killing Task Manager problem.
> > >> 4) With upgrading to 1.13, we also added Kubernetes HA
> (zookeeperless). Pervious we don’t not use HA.
> > >>
> > >> The cancel and restart from previous checkpoint is our regular
> procedure to support daily operation. With this 10 minutes TM restart
> cycle, it basically slowed down our throughput. I try to understand what
> leads into this situation. Hoping maybe some configuration change wil

Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread Piotr Nowojski
Thanks for the detailed explanation Yun Tang and clearly all of the effort
you have put into it. Based on what was described here I would also vote
for going forward with the upgrade.

It's a pity that this regression wasn't caught in the RocksDB community. I
would have two questions/ideas:
1. Can we push for better benchmark coverage in the RocksDB project in the
future?
2. Can we try to catch this kind of problems with RocksDB earlier? For
example with more frequent RocksDB upgrades, or building test flink builds
with the most recent RocksDB version to run our benchmarks and validate
newer RocksDB versions?

Best,
Piotrek

śr., 4 sie 2021 o 19:59 Yuval Itzchakov  napisał(a):

> Hi Yun,
> Thank you for the elaborate explanation and even more so for the super
> hard work that you're doing digging into RocksDB and chasing after
> hundreds of commits in order to fix them so we can all benefit.
>
> I can say for myself that optimizing towards memory is more important
> ATM for us, and I'm totally +1 for this.
>
> On Wed, Aug 4, 2021 at 8:50 PM Yun Tang  wrote:
>
>> Hi Yuval,
>>
>> Upgrading RocksDB version is a long story since Flink-1.10.
>> When we first plan to introduce write buffer manager to help control the
>> memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from
>> current RocksDB-5.17. However, we found performance regression in our micro
>> benchmark on state operations [1] if bumped to RocksDB-5.18. We did not
>> figure the root cause at that time and decide to cherry pick the commits of
>> write buffer manager to our own FRocksDB [2]. And we finally released our
>> own frocksdbjni-5.17.2-artisans-2.0 at that time.
>>
>> As time goes no, more and more bugs or missed features have been reported
>> in the old RocksDB version. Such as:
>>
>>1. Cannot support ARM platform [3]
>>2. Dose not have stable deleteRange API, which is useful for Flink
>>scale out [4]
>>3. Cannot support strict block cache [5]
>>4. Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
>>5. Uncontrolled log size make us disabled the RocksDB internal LOG [7]
>>6. RocksDB's optimizeForPointLookup option might cause data lost [8]
>>7. Current dummy entry used for memory control in RocksDB-5.17 is too
>>large, leading performance problem [9]
>>8. Cannot support alpine-based images.
>>9. ...
>>
>> Some of the bugs are walked around, and some are still open.
>>
>> And we decide to make some changes from Flink-1.12. First of all, we
>> reported the performance regression problem compared with RocksDB-5.18 and
>> RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions
>> are a bit older for the community, and RocksJava usage might not be the
>> core part for facebook guys, we did not get useful replies. Thus, we decide
>> to figure out the root cause of performance regression by ourself.
>> Fortunately, we find the cause via binary search the commits among
>> RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To
>> be short, the performance regression is due to different implementation of
>> `__thread` and `thread_local` in gcc and would have more impact on dynamic
>> loading [11], which is also what current RocksJava jar package does. With
>> my patch [12], the performance regression would disappear if comparing
>> RocksDB-5.18 with RocksDB-5.17.
>>
>> Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to
>> RocksDB-6.x. However, another performance regression appeared even with my
>> patch [12]. With previous knowledge, we know that we must verify the built
>> .so files with our java-based benchmark instead of using RocksDB built-in
>> db-bench. I started to search the 1340+ commits from RocksDB-5.18 to
>> RocksDB-6.11 to find the performance problem. However, I did not figure out
>> the root cause after spending several weeks this time. The performance
>> behaves up and down in those commits and I cannot get *the commit *which
>> lead the performance regression. Take this commit of integrating block
>> cache tracer in block-based table reader [13] for example, I noticed that
>> this commit would cause a bit performance regression and that might be the
>> useless usage accounting in operations, however, the problematic code was
>> changed in later commits. Thus, after several weeks digging, I have to give
>> up for the endless searching in the thousand commits temporarily. As
>> RocksDB community seems not make the project management system public,
>> unlike Apache's open JIRA systems, we do not know what benchmark they
>> actually run before releasing each version to guarantee the performance.
>>
>> With my patch [10] on latest RocksDB-6.20.3, we could get the results on
>> nexmark in the original thread sent by Stephan, and we can see the
>> performance behaves closely in many real-world cases. And we also hope new
>> features, such as direct buffer supporting [14] in RocksJava could help
>> impro

Re: Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-14 Thread Piotr Nowojski
Hi,

FYI, the performance regression after upgrading RocksDB was clearly visible
in all of our RocksDB related benchmarks, like for example:

http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS&env=2
http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS_INC&env=2
(and many more in the State Backends executable)

It's 6% to 12% across the board.

Best,
Piotrek


śr., 11 sie 2021 o 13:42 张蛟  napisał(a):

> Hi, Nico and yun:
>Thanks for your great work and detail description on rocksdb
> version upgrade and performance. About 800 jobs are using rocksdb state
> backend in our production environment, and we
> plan to upgrade more aim to solve the gc problems caused by large
> states.Because of non-restrict memory control on rocksdb, we have to spend
> a lot of time to solve the problem of memory usage beyond the physical
> memory.With the support of strict block cache, things will become much
> easier. Also, delete range api is useful for us too, so we prefer to
> upgrade the rocksdb to the new release version and +1(non-binding). best,
> zlzhang0122
>
> At 2021-08-05 01:50:07, "Yun Tang"  wrote:
> >Hi Yuval,
> >
> >Upgrading RocksDB version is a long story since Flink-1.10.
> >When we first plan to introduce write buffer manager to help control the
> memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from
> current RocksDB-5.17. However, we found performance regression in our micro
> benchmark on state operations [1] if bumped to RocksDB-5.18. We did not
> figure the root cause at that time and decide to cherry pick the commits of
> write buffer manager to our own FRocksDB [2]. And we finally released our
> own frocksdbjni-5.17.2-artisans-2.0 at that time.
> >
> >As time goes no, more and more bugs or missed features have been reported
> in the old RocksDB version. Such as:
> >
> >  1.  Cannot support ARM platform [3]
> >  2.  Dose not have stable deleteRange API, which is useful for Flink
> scale out [4]
> >  3.  Cannot support strict block cache [5]
> >  4.  Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
> >  5.  Uncontrolled log size make us disabled the RocksDB internal LOG [7]
> >  6.  RocksDB's optimizeForPointLookup option might cause data lost [8]
> >  7.  Current dummy entry used for memory control in RocksDB-5.17 is too
> large, leading performance problem [9]
> >  8.  Cannot support alpine-based images.
> >  9.  ...
> >
> >Some of the bugs are walked around, and some are still open.
> >
> >And we decide to make some changes from Flink-1.12. First of all, we
> reported the performance regression problem compared with RocksDB-5.18 and
> RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions
> are a bit older for the community, and RocksJava usage might not be the
> core part for facebook guys, we did not get useful replies. Thus, we decide
> to figure out the root cause of performance regression by ourself.
> >Fortunately, we find the cause via binary search the commits among
> RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To
> be short, the performance regression is due to different implementation of
> `__thread` and `thread_local` in gcc and would have more impact on dynamic
> loading [11], which is also what current RocksJava jar package does. With
> my patch [12], the performance regression would disappear if comparing
> RocksDB-5.18 with RocksDB-5.17.
> >
> >Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to
> RocksDB-6.x. However, another performance regression appeared even with my
> patch [12]. With previous knowledge, we know that we must verify the built
> .so files with our java-based benchmark instead of using RocksDB built-in
> db-bench. I started to search the 1340+ commits from RocksDB-5.18 to
> RocksDB-6.11 to find the performance problem. However, I did not figure out
> the root cause after spending several weeks this time. The performance
> behaves up and down in those commits and I cannot get the commit which lead
> the performance regression. Take this commit of integrating block cache
> tracer in block-based table reader [13] for example, I noticed that this
> commit would cause a bit performance regression and that might be the
> useless usage accounting in operations, however, the problematic code was
> changed in later commits. Thus, after several weeks digging, I have to give
> up for the endless searching in the thousand commits temporarily. As
> RocksDB community seems not make the project management system public,
> unlike Apache's open JIRA systems, we do not know what benchmark they
> actually run before releasing each version to guarantee the performance.
> >
> >With my patch [10] on latest RocksDB-6.20.3, we could get the results on
> nexmark in the original thread sent by Stephan, and we can see the
> performance behaves closely in many real-world cases. And we also hope new
> features, such as direct buffer supporting [14] in Ro

Re: Duplicate copies of job in Flink UI/API

2021-09-08 Thread Piotr Nowojski
Hi Peter,

Can you provide relevant JobManager logs? And can you write down what steps
have you taken before the failure happened? Did this failure occur during
upgrading Flink, or after the upgrade etc.

Best,
Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann 
napisał(a):

> We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing some weird
> behavior after a change in jobmanager leadership: We’re seeing two copies
> of the same job, one of those is in SUSPENDED state and has a start time of
> zero. Here’s the output from the /jobs/overview endpoint:
>
> {
>
>   "jobs": [{
>
> "jid": "2db4ee6397151a1109d1ca05188a4cbb",
>
> "name": "analytics-flink-v1",
>
> "state": "RUNNING",
>
> "start-time": 1631106146284,
>
> "end-time": -1,
>
> "duration": 2954642,
>
> "last-modification": 1631106152322,
>
> "tasks": {
>
>   "total": 112,
>
>   "created": 0,
>
>   "scheduled": 0,
>
>   "deploying": 0,
>
>   "running": 112,
>
>   "finished": 0,
>
>   "canceling": 0,
>
>   "canceled": 0,
>
>   "failed": 0,
>
>   "reconciling": 0
>
> }
>
>   }, {
>
> "jid": "2db4ee6397151a1109d1ca05188a4cbb",
>
> "name": "analytics-flink-v1",
>
> "state": "SUSPENDED",
>
> "start-time": 0,
>
> "end-time": -1,
>
> "duration": 1631105900760,
>
> "last-modification": 0,
>
> "tasks": {
>
>   "total": 0,
>
>   "created": 0,
>
>   "scheduled": 0,
>
>   "deploying": 0,
>
>   "running": 0,
>
>   "finished": 0,
>
>   "canceling": 0,
>
>   "canceled": 0,
>
>   "failed": 0,
>
>   "reconciling": 0
>
> }
>
>   }]
>
> }
>
>
>
> Has anyone seen this behavior before?
>
>
>
> Thanks,
>
> Peter
>


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-08 Thread Piotr Nowojski
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>


Re: Event is taking a lot of time between the operators

2021-09-28 Thread Piotr Nowojski
Hi,

With Flink 1.8.0 I'm not sure how reliable the backpressure status is in
the WebUI when it comes to the Async operators. If I remember correctly
until around Flink 1.10 (+/- 2 version) backpressure monitoring was
checking for thread dumps stuck in requesting Flink's network memory
buffers. If in your job AsyncFunction is the source of a backpressure, it
would be skipped and not reported. For analysing backpressure I would
highly recommend upgrading to Flink 1.13.x as it has greatly improved
tooling for that [1]. And in that version AsynFunctions are definitely
handled correctly. Since Flink 1.10 I believe you can use the
`isBackPressured` metric. In previous versions you would have to rely on
buffer usage metrics as described here [2].


[1] https://flink.apache.org/2021/07/07/backpressure.html
[2]
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics

Apart of the back pressure, part of the problem might be simply how long
does it take for `Async1` function to return the result. Have you checked
that? Isn't it taking a couple of seconds?

Best,
Piotrek

wt., 28 wrz 2021 o 15:55 Sanket Agrawal 
napisał(a):

> Hi All,
>
>
>
> I am new to Flink. While developing a Flink application We observed that
> our message is taking around 10 seconds between the two Async operators.
> Below are the details.
>
>
>
>- *Flink Flow*: Kinesis Source -> Process -> Async1 -> Async2 ->
>Process -> Kinesis Sink
>- *Environment*: Amazon KDA. 1 Kinesis Processing Unit (1vCore & 4GB
>ram), and 1 parallelism.
>- *Flink Version*: 1.8.0
>- *Backpressure*: Flink dashboard shows that backpressure is *OK.*
>- *Input rate: *60 messages per second.
>
>
>
> Any kind of pointers/help will be very useful.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>


Re: Event is taking a lot of time between the operators

2021-09-29 Thread Piotr Nowojski
Hi Sanket,

As I mentioned in the previous email, it's most likely still an issue of
backpressure and you can check it as I described in that message. Either
your records are stuck in the network buffers between (I) to async
operations (if there is a network exchange), and/or inside the
`AsyncWaitOperator`'s internal queue (II). If it's causing you problems

I. For the former problem (network buffers) you can:
a) get rid of the network exchange, via removing keyBy/shuffle/rebalance
(might not be feasible, depending on your business logic)
b) reduce the amount of the in-flight data. In Flink 1.14 we are adding
automatic buffer debloating mechanism, in Flink 1.8 you can not use, but
you could manually tweak both amount and the size of the buffers. You can
read about it here [1], just ignore the automatic buffer debloating
mechanism.
II. You can change the size of the internal queue by adjusting the
`capacity` parameter [2]

The more buffered in-flight data you have between operators, the longer the
delay between processing the same record by two different operators.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api



śr., 29 wrz 2021 o 08:20 Sanket Agrawal 
napisał(a):

> Hi Ragini,
>
>
>
> For measuring time in an async we have put a logger as the first and the
> last statement in asyncInvoke and for measuring time between the asyncs
> we are simply subtracting the message2’s start time and message1’s end
> time. Also, we are using 1 as the parallelism.
>
>
>
> Please let me know if you need any other information or if you have any
> recommendations on improving the approach.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Ragini Manjaiah 
> *Sent:* Wednesday, September 29, 2021 11:17 AM
> *To:* Sanket Agrawal 
> *Cc:* Piotr Nowojski ; user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi Sanket,
>
>  I have a similar use case. how are you measuring the time for Async1`
> function to return the result and external api call
>
>
>
> On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal <
> sanket.agra...@infosys.com> wrote:
>
> Hi @Piotr Nowojski ,
>
>
>
> Thank you for replying back. Yes, first async is taking between 1300-1500
> milliseconds but that is called on a CompletableFuture.*supplyAsync *and
> the Async Capacity is set to 1000.
>
>
>
> *Async Code Structure*: Inside asyncInvoke we are calling
> CompletableFuture.*supplyAsync *and inside* supplyAsync *we are calling
> an external API which is taking around 1005ms to 1040ms. Rest of the code
> for request creation/response validation is also inside the* supplyAsync *and
> is taking around 250ms.
>
>
>
> This way we tried that the main Async thread(as the async does not uses
> multiple threads directly) is available for the next message as soon as it
> calls CompletableFuture.supplyAsync on the current message.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, September 28, 2021 8:02 PM
> *To:* Sanket Agrawal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi,
>
>
>
> With Flink 1.8.0 I'm not sure how reliable the backpressure status is in
> the WebUI when it comes to the Async operators. If I remember correctly
> until around Flink 1.10 (+/- 2 version) backpressure monitoring was
> checking for thread dumps stuck in requesting Flink's network memory
> buffers. If in your job AsyncFunction is the source of a backpressure, it
> would be skipped and not reported. For analysing backpressure I would
> highly recommend upgrading to Flink 1.13.x as it has greatly improved
> tooling for that [1]. And in that version AsynFunctions are definitely
> handled correctly. Since Flink 1.10 I believe you can use the
> `isBackPressured` metric. In previous versions you would have to rely on
> buffer usage metrics as described here [2].
>
>
>
>
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
> <https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink.apache.org%2F2021%2F07%2F07%2Fbackpressure.html&data=04%7C01%7Csanket.agrawal%40infosys.com%7C3c523d23c78d85b908d9830caf3e%7C63ce7d592f3e42cda8ccbe764cff5eb6%7C0%7C0%7C637684912775947321%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2BzEolZuvsAgudziPqGzqFuDGRZbHR2hu9D%2F9rERLwk8%3D&reserved=0>
>
> [2]
> h

Re: How to ugrade JobManagerCommunicationUtils from FLink 1.4 to Flink 1.5?

2021-10-08 Thread Piotr Nowojski
Hi,

`JobManagerCommunicationUtils` was never part of Flink's API. It was an
internal class, for our internal unit tests. Note that Flink's public API
is annotated with `@Public`, `@PublicEvolving` or `@Experimental`. Anything
else by default is internal (sometimes to avoid confusion we are annotating
internal classes with `@Internal`).

JobManagerCommunicationUtils seems to be replaced with
`MiniClusterResource` [1] as part of [2]. Note that MiniClusterResource is
also not a public API, so it's subject to change or to be completely
removed without warning. You can read about how to test your applications
here [3], and about `MiniClusterResource` in particular here [4].

Piotrek

PS Flink 1.5 is also not supported for something like 2 years now?
Currently officially supported Flink versions are 1.13, and 1.14 so I would
encourage you to upgrade to one of those.

[1]
https://github.com/apache/flink/commits/release-1.5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
[2] https://issues.apache.org/jira/browse/FLINK-8703
[3]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/testing/
[4]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/testing/#testing-flink-jobs

pt., 8 paź 2021 o 10:11 Felipe Gutierrez 
napisał(a):

> Hello there,
>
> what is the replacement from Flink 1.4 to Flink 1.5 of the class
> JobManagerCommunicationUtils.java [1] below?
>
> JobManagerCommunicationUtils.cancelCurrentJob
> JobManagerCommunicationUtils.waitUntilNoJobIsRunning
>
> I want to upgrade Flink from 1.4 to 1.5 but I cant find this class in the
> docs of the previous version [2] neither on the next version [3]. My plan
> is also to upgrade Flink to the lates version. But if I cannot find a way
> to the next version 1.4 -> 1.5, I suppose that for a greater it will be
> even more difficult.
>
> [1]
> https://github.com/a0x8o/flink/blob/master/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java#L38-L60
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html
>
> Thanks in advance,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>


Re: Empty Kafka topics and watermarks

2021-10-08 Thread Piotr Nowojski
Hi James,

I believe you have encountered a bug that we have already fixed [1]. The
small problem is that in order to fix this bug, we had to change some
`@PublicEvolving` interfaces and thus we were not able to backport this fix
to earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
napisał(a):

> Hi everyone,
>
> I'm putting together a Flink workflow that needs to merge historic data
> from a custom JDBC source with a Kafka flow (for the realtime data). I have
> successfully written the custom JDBC source that emits a watermark for the
> last event time after all the DB data has been emitted but now I face a
> problem when joining with data from the Kafka stream.
>
> I register a timer in my KeyedCoProcessFunction joining the DB stream
> with live Kafka stream so I can emit all the "batch" data from the DB in
> one go when completely read up to the watermark but the timer never fires
> as the Kafka stream is empty and therefore doesn't emit a watermark. My
> Kafka stream will allowed to be empty since all the data will have been
> retrieved from the DB call so I only expect new events to appear over
> Kafka. Note that if I replace the Kafka input with a simple
> env.fromCollection(...) empty list then the timer triggers fine as Flink
> seems to detect it doesn't need to wait for any input from stream 2. So it
> seems to be something related to the Kafka stream status that is preventing
> the watermark from advancing in the KeyedCoProcessFunction.
>
> I have tried configuring the Kafka stream timestamp and watermark
> strategies to so the source is marked as idle after 10 seconds but still it
> seems the watermark in the join operator combining these 2 streams is not
> advancing. (See example code below).
>
> Maybe this is my bad understanding but I thought if an input stream into a
> KeyedCoProcessFunction is idle then it wouldn't be considered by the
> operator for forwarding the watermark i.e. it would forward the non-idle
> input stream's watermark and not do a min(stream1WM, stream2WM). With the
> below example I never see the onTimer fire and the only effect the
> withIdleness() strategy has is to stop the print statements in
> onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the
> default 200ms so I see 25 rows before it stops).
>
> The only way I can get my KeyedCoProcessFunction timer to fire is to force
> an emit of the watermark I want in the onPeriodicEmit() after x numbers of
> attempts to advance an initial watermark i.e. if onPeriodicEmit() is called
> 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the
> watermark I want so the join can progress. This seems like a nasty hack to
> me but perhaps something like this is actually necessary?
>
> I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any
> pointers would be appreciated.
>
> Thanks in advance,
>
> James.
>
> FlinkKafkaConsumer positionsFlinkKafkaConsumer = new
> FlinkKafkaConsumer<>("poc.positions",
> ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class,
> SchemaRegistryURL), kafkaConsumerProperties);
>
> positionsFlinkKafkaConsumer.setStartFromEarliest();
>
> positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(
>
>new WatermarkStrategy() {
>
>   @Override
>
>   public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>
> return (event, recordTimestamp) -> {
>
> return event.getPhysicalFrom();
>
> };
>
> }
>
>
>
>   @Override
>
>   public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>
> return new WatermarkGenerator() {
>
> public long latestWatermark = Long.MIN_VALUE;
>
>
>
> @Override
>
> public void onEvent(Position event, long
> timestamp, WatermarkOutput output) {
>
> long eventWatermark =
> event.getPhysicalFrom();
>
> if (eventWatermark > latestWatermark)
>
> latestWatermark = eventWatermark;
>
> }
>
>
>
> @Override
>
> public void onPeriodicEmit(WatermarkOutput
> output) {
>
> System.out.printf("Emitting watermark
> %d\n", latestWatermark);
>
> output.emitWatermark(new
> Watermark(latestWatermark));
>
> }
>
> };
>
> }
>
> }.withIdleness(Duration.ofSeconds(5)));
>
>
>
> DataStream positionKafkaInputStream =
> env.addSource(positionsFli

Re: Empty Kafka topics and watermarks

2021-10-11 Thread Piotr Nowojski
Great, thanks!

pon., 11 paź 2021 o 17:24 James Sandys-Lumsdaine 
napisał(a):

> Ah thanks for the feedback. I can work around for now but will upgrade as
> soon as I can to the latest version.
>
> Thanks very much,
>
> James.
> --
> *From:* Piotr Nowojski 
> *Sent:* 08 October 2021 13:17
> *To:* James Sandys-Lumsdaine 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Empty Kafka topics and watermarks
>
> Hi James,
>
> I believe you have encountered a bug that we have already fixed [1]. The
> small problem is that in order to fix this bug, we had to change some
> `@PublicEvolving` interfaces and thus we were not able to backport this fix
> to earlier minor releases. As such, this is only fixed starting from 1.14.x.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18934
>
> pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
> napisał(a):
>
> Hi everyone,
>
> I'm putting together a Flink workflow that needs to merge historic data
> from a custom JDBC source with a Kafka flow (for the realtime data). I have
> successfully written the custom JDBC source that emits a watermark for the
> last event time after all the DB data has been emitted but now I face a
> problem when joining with data from the Kafka stream.
>
> I register a timer in my KeyedCoProcessFunction joining the DB stream
> with live Kafka stream so I can emit all the "batch" data from the DB in
> one go when completely read up to the watermark but the timer never fires
> as the Kafka stream is empty and therefore doesn't emit a watermark. My
> Kafka stream will allowed to be empty since all the data will have been
> retrieved from the DB call so I only expect new events to appear over
> Kafka. Note that if I replace the Kafka input with a simple
> env.fromCollection(...) empty list then the timer triggers fine as Flink
> seems to detect it doesn't need to wait for any input from stream 2. So it
> seems to be something related to the Kafka stream status that is preventing
> the watermark from advancing in the KeyedCoProcessFunction.
>
> I have tried configuring the Kafka stream timestamp and watermark
> strategies to so the source is marked as idle after 10 seconds but still it
> seems the watermark in the join operator combining these 2 streams is not
> advancing. (See example code below).
>
> Maybe this is my bad understanding but I thought if an input stream into a
> KeyedCoProcessFunction is idle then it wouldn't be considered by the
> operator for forwarding the watermark i.e. it would forward the non-idle
> input stream's watermark and not do a min(stream1WM, stream2WM). With the
> below example I never see the onTimer fire and the only effect the
> withIdleness() strategy has is to stop the print statements in
> onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the
> default 200ms so I see 25 rows before it stops).
>
> The only way I can get my KeyedCoProcessFunction timer to fire is to force
> an emit of the watermark I want in the onPeriodicEmit() after x numbers of
> attempts to advance an initial watermark i.e. if onPeriodicEmit() is called
> 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the
> watermark I want so the join can progress. This seems like a nasty hack to
> me but perhaps something like this is actually necessary?
>
> I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any
> pointers would be appreciated.
>
> Thanks in advance,
>
> James.
>
> FlinkKafkaConsumer positionsFlinkKafkaConsumer = new
> FlinkKafkaConsumer<>("poc.positions",
> ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class,
> SchemaRegistryURL), kafkaConsumerProperties);
>
> positionsFlinkKafkaConsumer.setStartFromEarliest();
>
> positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(
>
>new WatermarkStrategy() {
>
>   @Override
>
>   public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>
> return (event, recordTimestamp) -> {
>
> return event.getPhysicalFrom();
>
> };
>
> }
>
>
>
>   @Override
>
>   public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>
> return new WatermarkGenerator() {
>
> public long latestWatermark = Long.MIN_VALUE;
>
>
>
> @Override
>
> public void onEvent(Posi

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
Hi Alexis,

You can read about those metrics in the documentation [1]. Long alignment
duration and start delay almost always come together. High values indicate
long checkpoint barrier propagation times through the job graph, that's
always (at least so far I haven't seen a different reason) caused by the
same thing: backpressure. Which brings me to

> There is no backpressure in any operator.

Why do you think so?

For analysing backpressure I would highly recommend upgrading to Flink 1.13.x
as it has greatly improved tooling for that [2]. Since Flink 1.10 I believe
you can use the `isBackPressured` metric. In previous versions you would
have to rely on buffer usage metrics as described here [3].

If this is indeed a problem with a backpressure, there are three things you
could do to improve checkpointing time:
a) Reduce the backpressure, either by optimising your job/code or scaling
up.
b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do it
automatically when buffer debloating is enabled, but the same
principle could be used to manually and statically configure cluster to
have less in-flight data. You can read about this here [4].
c) Enabled unaligned checkpoints [5].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
[2] https://flink.apache.org/2021/07/07/backpressure.html
[3] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
#network-metrics
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Piotrek

czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> I would really appreciate more fine-grained information regarding the
> factors that can affect a checkpoint’s:
>
>
>
>- Sync duration
>- Async duration
>- Alignment duration
>- Start delay
>
>
>
> Otherwise those metrics don’t really help me know in which areas to look
> for issues.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Mittwoch, 20. Oktober 2021 09:43
> *To:* Parag Somani ; Caizhi Weng <
> tsreape...@gmail.com>
> *Cc:* Flink ML 
> *Subject:* RE: Troubleshooting checkpoint timeout
>
>
>
> Currently the windows are 10 minutes in size with a 1-minute slide time.
> The approximate 500 event/minute throughput is already rather high for my
> use case, so I don’t expect it to be higher, but I would imagine that’s
> still pretty low.
>
>
>
> I did have some issues with storage space, and I wouldn’t be surprised if
> there is an IO bottleneck in my dev environment, but then my main question
> would be: if IO is being throttled, could that result in the high “start
> delay” times I observe? That seems to be the main slowdown, so I just want
> to be sure I’m looking in the right direction.
>
>
>
> I’d like to mention another thing about my pipeline’s structure in case
> it’s relevant, although it may be completely unrelated. I said that I
> specify the windowing properties once (windowedStream in my 1st e-mail)
> and use it twice, but it’s actually used 3 times. In addition to the 2
> ProcessWindowFunctions that end in sinks, the stream is also joined with a
> side output:
>
>
>
> openedEventsTimestamped = openedEvents
>
> .getSideOutput(…)
>
> .keyBy(keySelector)
>
> .assignTimestampsAndWatermarks(watermarkStrategy)
>
>
>
> windowedStream
>
> .process(ProcessWindowFunction3())
>
> .keyBy(keySelector)
>
>
> .connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped,
> keySelector))
>
> .process(...)
>
>
>
> Could this lead to delays or alignment issues?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Parag Somani 
> *Sent:* Mittwoch, 20. Oktober 2021 09:22
> *To:* Caizhi Weng 
> *Cc:* Alexis Sarda-Espinosa ; Flink
> ML 
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> I had similar problem, where i have concurrent two checkpoints were
> configured. Also, i used to save it in S3(using minio) on k8s 1.18 env.
>
>
>
> Flink service were getting restarted and timeout was happening. It got
> resolved:
>
> 1. As minio ran out of disk space, caused failure of checkpoints(this was
> the main cause).
>
> 2. Added duration/interval of checkpoint parameter to address it
>
> execution.checkpointing.max-concurrent-checkpoints and
> execution.checkpointing.min-pause
>
> Details of same at:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing
>
>
>
>
>
> On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng  wrote:
>
> Hi!
>
>
>
> I see you're using sliding event time windows. What's the exact value of
> windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
> large and windowSlideTimeMinutes is small then each record may be assigned
> to a large number of wind

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
s, and I
> can see the timer timestamps lagging clock time by up to 1 hour. Since the
> logs don’t indicate the operator’s logic takes a significant amount of time
> and CPU is far below the available limit (the single TM barely uses more
> than 1 CPU out of 4), I’d guess the lag could be related to checkpoint
> alignment, which takes me to my questions:
>
>
>
>1. The documentation states “Operators that receive more than one
>input stream need to align the input streams on the snapshot barriers”. If
>an operator has parallelism > 1, does that count as more than one stream?
>Or is there a single output barrier for all subtask outputs that gets
>“copied” to all downstream subtask inputs?
>2. Similarly, alignment duration is said to be “The time between
>processing the first and the last checkpoint barrier”. What exactly is the
>interpretation of “first” and “last” here? Do they relate to a checkpoint
>“n” where “first” would be the barrier for n-1 and “last” the one for n?
>3. Start delay also refers to the “first checkpoint barrier to reach
>this subtask”. As before, what is “first” in this context?
>4. Maybe this will be answered by the previous questions, but what
>happens to barriers if a downstream operator has lower parallelism?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 25. Oktober 2021 09:59
> *To:* Alexis Sarda-Espinosa 
> *Cc:* Parag Somani ; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML 
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi Alexis,
>
>
>
> You can read about those metrics in the documentation [1]. Long alignment
> duration and start delay almost always come together. High values indicate
> long checkpoint barrier propagation times through the job graph, that's
> always (at least so far I haven't seen a different reason) caused by the
> same thing: backpressure. Which brings me to
>
>
>
> > There is no backpressure in any operator.
>
>
>
> Why do you think so?
>
>
>
> For analysing backpressure I would highly recommend upgrading to Flink 1.13.x
> as it has greatly improved tooling for that [2]. Since Flink 1.10 I
> believe you can use the `isBackPressured` metric. In previous versions you
> would have to rely on buffer usage metrics as described here [3].
>
>
>
> If this is indeed a problem with a backpressure, there are three things
> you could do to improve checkpointing time:
>
> a) Reduce the backpressure, either by optimising your job/code or scaling
> up.
>
> b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do
> it automatically when buffer debloating is enabled, but the same
> principle could be used to manually and statically configure cluster to
> have less in-flight data. You can read about this here [4].
>
> c) Enabled unaligned checkpoints [5].
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
> [2] https://flink.apache.org/2021/07/07/backpressure.html
>
> [3] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
> #network-metrics
>
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
>
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints
>
>
>
> Best,
>
> Piotrek
>
>
>
> czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> I would really appreciate more fine-grained information regarding the
> factors that can affect a checkpoint’s:
>
>
>
>- Sync duration
>- Async duration
>- Alignment duration
>- Start delay
>
>
>
> Otherwise those metrics don’t really help me know in which areas to look
> for issues.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Mittwoch, 20. Oktober 2021 09:43
> *To:* Parag Somani ; Caizhi Weng <
> tsreape...@gmail.com>
> *Cc:* Flink ML 
> *Subject:* RE: Troubleshooting checkpoint timeout
>
>
>
> Currently the windows are 10 minutes in size with a 1-minute slide time.
> The approximate 500 event/minute throughput is already rather high for my
> use case, so I don’t expect it to be higher, but I would imagine that’s
> still pretty low.
>
>
>
> I did have some issues with storage space, and I wouldn’t be surprised if
> there is an IO bottleneck in my dev environment, but then my main question
> would be: if IO is being throttled, could that resul

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
Hi Alexis,

>  Should I understand these metrics as a property of an operator and not
of each subtask (at least for aligned checkpoints)? Then “first” and “last”
would make sense to me: first/last across all subtasks/channels for a given
operator.

Those are properties of a subtask. Subtasks are a collection of chained
parallel instances of operators. If you have a simple job like
`source.keyBy(...).window(...).process(...)`, with parallelism of 10, you
will have two tasks. Each task will have 10 subtasks. Each subtask will
have only a single element operator chain, with a single operator (either
source operator for the source task/subtasks, or window/process function
for the second task). If you add a sink to your job
`source.keyBy(...).window(...).process(...).addSink(...)`, this sink will
be chained with the window/process operator. You will still end up with two
tasks:

1. Source
2. Window -> Sink

again, each will have 10 subtasks, with parallel instances of the
respective operators.

So if you look at the "alignment duration" of a subtask from "2. Window ->
Sink" task, that will be the difference between receiving a first
checkpoint barrier from any of the "1. Source" subtasks and the last
checkpoint barrier from those "1. Source" subtasks.

> Naturally, for unaligned checkpoints, alignment duration isn’t
applicable, but what about Start Delay? I imagine that might indeed be a
property of the subtask and not the operator.

As per the docs that I've already linked [1]

Alignment Duration: The time between processing the first and the last
checkpoint barrier. For aligned checkpoints, during the alignment, the
channels that have already received checkpoint barriers are blocked from
processing more data.

This number is also defined the same way for the unaligned checkpoints.
Even with unaligned checkpoints a subtask needs to wait for receiving all
of the checkpoint barriers before completing the checkpoint. However, as
subtask can broadcast the checkpoint barrier downstream immediately upon
receiving the first checkpoint barrier AND those checkpoint barriers are
able to overtake in-flight data, the propagation happens very very quickly
for the most part. Hence alignment duration and start delay in this case
should be very small, unless you have deeper problems like long GC pauses.

> If I’m understanding the aligned checkpoint mechanism correctly, after
the first failure the job restarts and tries to read, let’s say, the last 5
minutes of data. Then it fails again because the checkpoint times out and,
after restarting, would it try to read, for example, 15 minutes of data? If
there was no backpressure in the source, it could be that the new
checkpoint barriers created after the first restart are behind more data
than before it restarted, no?

I'm not sure if I understand. But yes. It's a valid scenario that:

1. timestamp t1, checkpoint 42 completes
2. failure happens at timestamp t1 + 10 minutes.
3. timestamp t2, job is recovered to checkpoint 42.
4. timestamp t2 + 5 minutes, checkpoint 43 is triggered.

Between 1. and 2., your job could have processed more records than between
3. and 4.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/

pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Hi again,
>
>
>
> Thanks a lot for taking the time to clarify this. I think that the main
> thing that is confusing me is that the UI shows Alignment Duration and
> other checkpoint metrics for each subtask, and the resources you’ve sent
> always discuss a single barrier per subtask channel. Should I understand
> these metrics as a property of an operator and not of each subtask (at
> least for aligned checkpoints)? Then “first” and “last” would make sense to
> me: first/last across all subtasks/channels for a given operator.
>
>
>
> Naturally, for unaligned checkpoints, alignment duration isn’t applicable,
> but what about Start Delay? I imagine that might indeed be a property of
> the subtask and not the operator.
>
>
>
> With respect to my problem, I can also add that my job reads data from
> Pulsar, so some of it is buffered in the message bus. If I’m understanding
> the aligned checkpoint mechanism correctly, after the first failure the job
> restarts and tries to read, let’s say, the last 5 minutes of data. Then it
> fails again because the checkpoint times out and, after restarting, would
> it try to read, for example, 15 minutes of data? If there was no
> backpressure in the source, it could be that the new checkpoint barriers
> created after the first restart are behind more data than before it
> restarted, no?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 25. O

Re: Troubleshooting checkpoint timeout

2021-10-26 Thread Piotr Nowojski
I'm glad that I could help :)

Piotrek

pon., 25 paź 2021 o 16:04 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Oh, I got it. I should’ve made the connection earlier after you said “Once
> an operator decides to send/broadcast a checkpoint barrier downstream, it
> just broadcasts it to all output channels”.
>
>
>
> I’ll see what I can do about upgrading the Flink version and do some more
> tests with unaligned checkpoints. Thanks again for all the info.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 25. Oktober 2021 15:51
> *To:* Alexis Sarda-Espinosa 
> *Cc:* Parag Somani ; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML 
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi Alexis,
>
>
>
> >  Should I understand these metrics as a property of an operator and not
> of each subtask (at least for aligned checkpoints)? Then “first” and “last”
> would make sense to me: first/last across all subtasks/channels for a given
> operator.
>
>
>
> Those are properties of a subtask. Subtasks are a collection of chained
> parallel instances of operators. If you have a simple job like
> `source.keyBy(...).window(...).process(...)`, with parallelism of 10, you
> will have two tasks. Each task will have 10 subtasks. Each subtask will
> have only a single element operator chain, with a single operator (either
> source operator for the source task/subtasks, or window/process function
> for the second task). If you add a sink to your job
> `source.keyBy(...).window(...).process(...).addSink(...)`, this sink will
> be chained with the window/process operator. You will still end up with two
> tasks:
>
>
>
> 1. Source
> 2. Window -> Sink
>
>
>
> again, each will have 10 subtasks, with parallel instances of the
> respective operators.
>
>
>
> So if you look at the "alignment duration" of a subtask from "2. Window ->
> Sink" task, that will be the difference between receiving a first
> checkpoint barrier from any of the "1. Source" subtasks and the last
> checkpoint barrier from those "1. Source" subtasks.
>
>
>
> > Naturally, for unaligned checkpoints, alignment duration isn’t
> applicable, but what about Start Delay? I imagine that might indeed be a
> property of the subtask and not the operator.
>
> As per the docs that I've already linked [1]
>
>
> Alignment Duration: The time between processing the first and the last
> checkpoint barrier. For aligned checkpoints, during the alignment, the
> channels that have already received checkpoint barriers are blocked from
> processing more data.
>
>
>
> This number is also defined the same way for the unaligned checkpoints.
> Even with unaligned checkpoints a subtask needs to wait for receiving all
> of the checkpoint barriers before completing the checkpoint. However, as
> subtask can broadcast the checkpoint barrier downstream immediately upon
> receiving the first checkpoint barrier AND those checkpoint barriers are
> able to overtake in-flight data, the propagation happens very very quickly
> for the most part. Hence alignment duration and start delay in this case
> should be very small, unless you have deeper problems like long GC pauses.
>
> > If I’m understanding the aligned checkpoint mechanism correctly, after
> the first failure the job restarts and tries to read, let’s say, the last 5
> minutes of data. Then it fails again because the checkpoint times out and,
> after restarting, would it try to read, for example, 15 minutes of data? If
> there was no backpressure in the source, it could be that the new
> checkpoint barriers created after the first restart are behind more data
> than before it restarted, no?
>
>
>
> I'm not sure if I understand. But yes. It's a valid scenario that:
>
> 1. timestamp t1, checkpoint 42 completes
> 2. failure happens at timestamp t1 + 10 minutes.
> 3. timestamp t2, job is recovered to checkpoint 42.
>
> 4. timestamp t2 + 5 minutes, checkpoint 43 is triggered.
>
>
>
> Between 1. and 2., your job could have processed more records than between
> 3. and 4.
>
>
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
>
>
> pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> Hi again,
>
>
>
> Thanks a lot for taking the time to clarify this. I think that the main
> thing that is confusing me is that the UI shows Alignment Duration and
> other checkpoint metrics for each s

Re: Beginner: guidance on long term event stream persistence and replaying

2021-11-09 Thread Piotr Nowojski
Hi Simon,

>From the top of my head I do not see a reason why this shouldn't work in
Flink. I'm not sure what your question is here.

For reading both from the FileSource and Kafka at the same time you might
want to take a look at the Hybrid Source [1]. Apart from that there are
FileSource/FileSink and KafaSource that I presume you have already found :)

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/

pon., 8 lis 2021 o 22:22 Simon Paradis  napisał(a):

> Hi,
>
> We have an event processing pipeline that populates various reports from
> different Kafka topics and would like to centralize processing in Flink. My
> team is new to Flink but we did some prototyping using Kinesis.
>
> To enable new reporting based on past events, we'd like the ability to
> replay those Kafka events when creating new reports; a capability we don't
> have today.
>
> We ingest the same topics from many Kafka clusters in different
> datacenters and it is not practical to have enough retention on these Kafka
> topics for technical reasons and also practical issues around GDPR
> compliance and Kafka's immutability (it's not an issue today because our
> Kafka retention is short).
>
> So we'd like to archive events into files that we push to AWS S3 along
> with some metadata to help implement GDPR more efficiently. I've looked
> into Avro object container files and it seems like it would work for us.
>
> I was thinking of having a dedicated Flink job reading and archiving to S3
> and somehow plug these S3 files back into a FileSource when a replay is
> needed to backfill new reporting views. S3 would contain Avro container
> files with a pattern like
>
> sourceDC__topicName__MMDDHHMM__NN.data
>
> where files are rolled over every hour or so and "rekeyed" into NN slots
> as per the event key to retain logical order while having reasonable file
> sizes.
>
> I presume someone has already done something similar. Any pointer would be
> great!
>
> --
> Simon Paradis
> paradissi...@gmail.com
>


Re: A savepoint was created but the corresponding job didn't terminate successfully.

2021-11-09 Thread Piotr Nowojski
Hi Dongwon,

Thanks for reporting the issue, I've created a ticket for it [1] and we
will analyse and try to fix it soon. In the meantime it should be safe for
you to ignore this problem. If this failure happens only rarely, you can
always retry stop-with-savepoint command and there should be no visible
side effects for you.

Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-24846

wt., 9 lis 2021 o 03:55 Dongwon Kim  napisał(a):

> Hi community,
>
> I failed to stop a job with savepoint with the following message:
>
>> Inconsistent execution state after stopping with savepoint. At least one
>> execution is still in one of the following states: FAILED, CANCELED. A
>> global fail-over is triggered to recover the job
>> 452594f3ec5797f399e07f95c884a44b.
>>
>
> The job manager said
>
>>  A savepoint was created at
>> hdfs://mobdata-flink-hdfs/driving-habits/svpts/savepoint-452594-f60305755d0e
>> but the corresponding job 452594f3ec5797f399e07f95c884a44b didn't terminate
>> successfully.
>
> while complaining about
>
>> Mailbox is in state QUIESCED, but is required to be in state OPEN for put
>> operations.
>>
>
> Is it okay to ignore this kind of error?
>
> Please see the attached files for the detailed context.
>
> FYI,
> - I used the latest 1.14.0
> - I started the job with "$FLINK_HOME"/bin/flink run --target yarn-per-job
> - I couldn't reproduce the exception using the same jar so I might not
> able to provide DUBUG messages
>
> Best,
>
> Dongwon
>
>


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Piotr Nowojski
Hi All,

to me it looks like something deadlocked, maybe due to this OOM error from
Kafka, preventing a Task from making any progress. To confirm Dongwan you
could collecte stack traces while the job is in such a blocked state.
Deadlocked Kafka could easily explain those symptoms and it would be
visible as an extreme back pressure. Another thing to look at would be if
the job is making any progress or not at all (via for example
numRecordsIn/numRecordsOut metric [1]).

A couple of clarifications.

> What I suspect is the capacity of the asynchronous operation because
limiting the value can cause back-pressure once the capacity is exhausted
[1].
> Although I could increase the value (...)

If you want to decrease the impact of a backpressure, you should decrease
the capacity. Not increase it. The more in-flight records in the system,
the more records need to be processed/persisted in aligned/unaligned
checkpoints.

> As far as I can tell from looking at the code, the async operator is able
to checkpoint even if the work-queue is exhausted.

Yes and no. If work-queue is full, `AsyncWaitOperator` can be snapshoted,
but it can not be blocked inside the `AsyncWaitOperator#processElement`
method. For checkpoint to be executed, `AsyncWaitrOperator` must finish
processing the current record and return execution to the task thread. If
the work-queue is full, `AsyncWaitOperator` will block inside the
`AsyncWaitOperator#addToWorkQueue` method until the work-queue will have
capacity to accept this new element. If what I suspect is happening here is
true, and the job is deadlocked via this Kafka issue, `AsyncWaitOperator`
will be blocked indefinitely in this method.

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/ops/metrics/#io



wt., 9 lis 2021 o 11:55 Fabian Paul  napisał(a):

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian


Re: Providing files while application mode deployment

2021-11-09 Thread Piotr Nowojski
Hi Vasily,

Unfortunately no, I don't think there is such an option in your case. With
per job mode, you could try to use the Distributed Cache, it should be
working in streaming as well [1], but this doesn't work in the application
mode, as in that case no code is executed on the JobMaster [2]

Two workarounds that I could propose, that I know are not perfect is to:
- bundle the configuration file in the jar
- pass the entire configuration as a parameter to the job though some json,
or base64 encoded parameter.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#distributed-cache
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/#overview-and-reference-architecture

wt., 9 lis 2021 o 14:14 Vasily Melnik 
napisał(a):

> Hi all.
>
> While running Flink jobs in application mode on YARN and Kuber, we need to
> provide some configuration files to main class. Is there any option on
> Flink CLI  to copy local files on cluster without manually copying on DFS
> or in docker image, something like *--files* option in spark-submit?
>
>
>


Re: Input Selectable & Checkpointing

2021-11-24 Thread Piotr Nowojski
Hi Shazia,

FLIP-182 [1] might be a thing that will let you address issues like this in
the future. With it, maybe you could do some magic with assigning
watermarks to make sure that one stream doesn't run too much into the
future which would effectively prioritise the other stream. But that's
currently aimed for Flink 1.15 (subject to change), which is still a couple
of months away.

For the time being, a workaround that I know some people were using is to
implement some manual throttling of the sources. Either via a throttling
operator/mapping function chained directly after the sources, or
implemented inside your custom source. One issue that complicates this
solution is that most likely you would need to use an external system
(external database?, maybe some file?) to control how much and when to
throttle whom. To decide whom to throttle you could use Flink metrics [2],
especially something around the amount of bytes/records processed by an
operator/subtask. Also note that be cautious when doing sleeps, as when you
are blocking calls inside your code, you will block checkpointing for
example. And let me stress this one more time, throttling should be chained
directly after the sources. If there is a network exchange between source
and throttling function, you would capture a lot of in-flight records
between the two, causing potentially crippling back pressure that would
especially affect aligned checkpointing [3].

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

wt., 23 lis 2021 o 15:52 Shazia Kayani  napisał(a):

> Hi Martijn,
>
> Its a continuous requirement so always read from one input source over
> another, but its does not require a super strict guarantee, so it doesn't
> matter if on occasion a message is read from the wrong topic. It's mainly
> due to there consistently being significantly more messages on one source
> than another which causes issues when we there are too many messages on the
> stream.
>
> Thanks
>
> Shazia
>
>
> - Original message -
> From: "Martijn Visser" 
> To: "Shazia Kayani" 
> Cc: "User" 
> Subject: [EXTERNAL] Re: Input Selectable & Checkpointing
> Date: Tue, Nov 23, 2021 2:45 PM
>
> Hi,
>
> Do you have a requirement to continuously prioritise one input source over
> another (like always read topic X from Kafka before topic Y from Kafka) or
> is it a one-time effort, because you might need to bootstrap some state, so
> first read all data from file source A before switching over to topic B
> from Kafka?). If it's the latter, you could look into the HybridSource.
>
> Best regards,
>
> Martijn
>
> On Tue, 23 Nov 2021 at 15:34, Shazia Kayani  wrote:
>
> Hi All,
>
> Hope you are well!
>
> I am working on something which has a requirement from flink to prioritise
> one input datastream over another, to do this I'm currently implemented an
> operator which extends InputSelectable to do this.
> However, because of using input selectable checkpointing is disabled as it
> is currently not supported.
>
> I was just wondering if anyone has done something similar to
> this previously? and if so were you able to implement changes which
> resulted in successful checkpointing?
> If anyone has any other tips around the topic that too would also be
> helpful!
>
> Thanks
>
> Shazia
>
> Unless stated otherwise above:
>
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
>
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>
>
>
>
> Unless stated otherwise above:
>
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
>
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>
>


Re: Input Selectable & Checkpointing

2021-11-25 Thread Piotr Nowojski
You're welcome!

Piotrek

śr., 24 lis 2021 o 17:48 Shazia Kayani  napisał(a):

> Hi Piotrek,
>
> Thanks for you message!
>
> Ok that does sound interesting and is a approach I had not considered
> before, will take a look into and further investigate
>
>
> Thank you!
>
> Best wishes,
>
> Shazia
>
>
> - Original message -
> From: "Piotr Nowojski" 
> To: "Shazia Kayani" 
> Cc: mart...@ververica.com, "user" 
> Subject: [EXTERNAL] Re: Input Selectable & Checkpointing
> Date: Wed, Nov 24, 2021 11:08 AM
>
> Hi Shazia, FLIP-182 [1] might be a thing that will let you address issues
> like this in the future. With it, maybe you could do some magic with
> assigning watermarks to make sure that one stream doesn't run too much into
> the future which ZjQcmQRYFpfptBannerStart
> This Message Is From an External Sender
> This message came from outside your organization.
> ZjQcmQRYFpfptBannerEnd
> Hi Shazia,
>
> FLIP-182 [1] might be a thing that will let you address issues like this
> in the future. With it, maybe you could do some magic with assigning
> watermarks to make sure that one stream doesn't run too much into the
> future which would effectively prioritise the other stream. But that's
> currently aimed for Flink 1.15 (subject to change), which is still a couple
> of months away.
>
> For the time being, a workaround that I know some people were using is to
> implement some manual throttling of the sources. Either via a throttling
> operator/mapping function chained directly after the sources, or
> implemented inside your custom source. One issue that complicates this
> solution is that most likely you would need to use an external system
> (external database?, maybe some file?) to control how much and when to
> throttle whom. To decide whom to throttle you could use Flink metrics [2],
> especially something around the amount of bytes/records processed by an
> operator/subtask. Also note that be cautious when doing sleeps, as when you
> are blocking calls inside your code, you will block checkpointing for
> example. And let me stress this one more time, throttling should be chained
> directly after the sources. If there is a network exchange between source
> and throttling function, you would capture a lot of in-flight records
> between the two, causing potentially crippling back pressure that would
> especially affect aligned checkpointing [3].
>
> Best,
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
>
> wt., 23 lis 2021 o 15:52 Shazia Kayani  napisał(a):
>
> Hi Martijn,
>
> Its a continuous requirement so always read from one input source over
> another, but its does not require a super strict guarantee, so it doesn't
> matter if on occasion a message is read from the wrong topic. It's mainly
> due to there consistently being significantly more messages on one source
> than another which causes issues when we there are too many messages on the
> stream.
>
> Thanks
>
> Shazia
>
>
> - Original message -
> From: "Martijn Visser" 
> To: "Shazia Kayani" 
> Cc: "User" 
> Subject: [EXTERNAL] Re: Input Selectable & Checkpointing
> Date: Tue, Nov 23, 2021 2:45 PM
>
> Hi,
>
> Do you have a requirement to continuously prioritise one input source over
> another (like always read topic X from Kafka before topic Y from Kafka) or
> is it a one-time effort, because you might need to bootstrap some state, so
> first read all data from file source A before switching over to topic B
> from Kafka?). If it's the latter, you could look into the HybridSource.
>
> Best regards,
>
> Martijn
>
> On Tue, 23 Nov 2021 at 15:34, Shazia Kayani  wrote:
>
> Hi All,
>
> Hope you are well!
>
> I am working on something which has a requirement from flink to prioritise
> one input datastream over another, to do this I'm currently implemented an
> operator which extends InputSelectable to do this.
> However, because of using input selectable checkpointing is disabled as it
> is currently not supported.
>
> I was just wondering if anyone has done something similar to
> this previously? and if so were you able to implement changes which
> resulted in successful checkpointing?
> If anyone has any other tips around the topic that too would also be
> helpful!
>
> Thanks
>
> Shazia
>
> Unless stated otherwise 

Re: Buffering when connecting streams

2021-12-05 Thread Piotr Nowojski
Hi Alexis and David,

This actually can not happen. There are mechanisms in the code to make sure
none of the input is starved IF there is some data to be read.

The only time when input can be blocked is during the alignment phase of
aligned checkpoints under back pressure. If there was a back pressure in
your job it could have easily happened that checkpoint barriers would flow
through the job graph to the CoProcessKeyedCoProcessFunction on one of the
paths much quicker then the other, causing this faster path to be blocked
until the other side catched up. But that would happen only during the
alignment phase of the checkpoint, so without a backpressure for a very
short period of time.

Piotrek

czw., 2 gru 2021 o 18:23 David Morávek  napisał(a):

> I think this could happen, but I have a very limited knowledge about how
> the input gates work internally. @Piotr could definitely provide some more
> insight here.
>
> D.
>
> On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
>> I do have some logic with timers today, but it’s indeed not ideal. I
>> guess I’ll have a look at TwoInputStreamOperator, but I do have related
>> questions. You mentioned a sample scenario of "processing backlog" where
>> windows fire very quickly; could it happen that, in such a situation, the
>> framework calls the operator’s processElement1 continuously (even for
>> several minutes) before calling processElement2 a single time? How does the
>> framework decide when to switch the stream processing when the streams are
>> connected?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezember 2021 17:18
>> *To:* Alexis Sarda-Espinosa 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Buffering when connecting streams
>>
>>
>>
>> Even with the TwoInputStreamOperator you can not "halt" the processing.
>> You need to buffer these elements for example in the ListState for later
>> processing. At the time the watermark of the second stream arrives, you can
>> process all buffered elements that satisfy the condition.
>>
>>
>>
>> You could probably also implement a similar (less optimized) solution
>> with KeyedCoProcessFunction using event time timers.
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
>> alexis.sarda-espin...@microfocus.com> wrote:
>>
>> Yes, that sounds right, but with my current KeyedCoProcessFunction I
>> can’t tell Flink to "halt" processElement1 and switch to the other stream
>> depending on watermarks. I could look into TwoInputStreamOperator if you
>> think that’s the best approach.
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezember 2021 16:59
>> *To:* Alexis Sarda-Espinosa 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Buffering when connecting streams
>>
>>
>>
>> I think this would require using lower level API and implementing a
>> custom `TwoInputStreamOperator`. Then you can hook to
>> `processWatemark{1,2}` methods.
>>
>>
>>
>> Let's also make sure we're on the same page on what the watermark is. You
>> can think of the watermark as event time clock. It basically gives you an
>> information, that *no more events with timestamp lower than the
>> watermark should appear in your stream*.
>>
>>
>>
>> You simply delay emitting of the window result from your "connect"
>> operator, until watermark from the second (side output) stream passes the
>> window's max timestamp (maximum timestamp that is included in the window).
>>
>>
>>
>> Does that make sense?
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
>> alexis.sarda-espin...@microfocus.com> wrote:
>>
>> Could you elaborate on what you mean with synchronize? Buffering in the
>> state would be fine, but I haven’t been able to come up with a good way of
>> ensuring that all data from the side stream for a given minute is processed
>> by processElement2 before all data for the same (windowed) minute reaches
>> processElement1, even when considering watermarks.
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezember 2021 15:45
>> *To:* Alexis Sarda-Espinosa 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Buffering when connecting streams
>>
>>
>>
>> You can not rely on order of the two streams that easily. In case you are
>> for example processing backlog and the windows fire quickly, it can happen
>> that it's actually faster than the second branch which has less work to do.
>> This will make the pipeline non-deterministic.
>>
>>
>>
>> What you can do is to "synchronize" watermarks of both streams in your
>> "connect" operator, but that of course involves buffering events in the
>> state.
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
>> alexis.sarda-espin...@microfocus.com> wrote:
>>
>> Hi David,
>>
>>
>>
>> A watermark step simply

Re: unaligned checkpoint for job with large start delay

2021-12-16 Thread Piotr Nowojski
Hi Mason,

In Flink 1.14 we have also changed the timeout behavior from checking
against the alignment duration, to simply checking how old is the
checkpoint barrier (so it would also account for the start delay) [1]. It
was done in order to solve problems as you are describing. Unfortunately we
can not backport this change to 1.13.x as it's a breaking change.

Anyway, from our experience I would recommend going all in with the
unaligned checkpoints, so setting the timeout back to the default value of
0ms. With timeouts you are gaining very little (a tiny bit smaller state
size if there is no backpressure - tiny bit because without backpressure,
even with timeout set to 0ms, the amount of captured inflight data is
basically insignificant), while in practise you slow down the checkpoint
barriers propagation time by quite a lot.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-23041

wt., 14 gru 2021 o 22:04 Mason Chen  napisał(a):

> Hi all,
>
> I'm using Flink 1.13 and my job is experiencing high start delay, more so
> than high alignment time. (our flip 27 kafka source is heavily
> backpressured). Since our alignment timeout is set to 1s, the unaligned
> checkpoint never triggers since alignment delay is always below the
> threshold.
>
> It's seems there is only a configuration for alignment timeout but should
> there also be one for start delay timeout:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>
> I'm interested to know the reasoning why there isn't a timeout for start
> delay as well--was it because it was deemed too complex for the user to
> configure two parameters for unaligned checkpoints?
>
> I'm aware of buffer debloating in 1.14 that could help but I'm trying to
> see how far unaligned checkpointing can take me.
>
> Best,
> Mason
>


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread Piotr Nowojski
Hi Tao,

Could you prepare a minimalistic example that would reproduce this issue?
Also what Flink version are you using?

Best,
Piotrek

czw., 16 gru 2021 o 09:44 tao xiao  napisał(a):

> >Your upstream is not inflating the record size?
> No, this is a simply dedup function
>
> On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:
>
>> Ah yes I see it now as well. Yes you are right, each record should be
>> replicated 9 times to send to one of the instances each. Your upstream is
>> not inflating the record size? The number of records seems to work
>> decently. @pnowojski  FYI.
>>
>> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>>
>>> Hi Arvid
>>>
>>> The second picture shows the metrics of the upstream operator. The
>>> upstream has 150 parallelisms as you can see in the first picture. I expect
>>> the bytes sent is about 9 * bytes received as we have 9 downstream
>>> operators connecting.
>>>
>>> Hi Caizhi,
>>> Let me create a minimal reproducible DAG and update here
>>>
>>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>>
 Hi,

 Could you please clarify which operator we see in the second picture?

 If you are showing the upstream operator, then this has only
 parallelism 1, so there shouldn't be multiple subtasks.
 If you are showing the downstream operator, then the metric would refer
 to the HASH and not REBALANCE.

 On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
 wrote:

> Hi!
>
> This doesn't seem to be the expected behavior. Rebalance shuffle
> should send records to one of the parallelism, not all.
>
> If possible could you please explain what your Flink job is doing and
> preferably share your user code so that others can look into this case?
>
> tao xiao  于2021年12月11日周六 01:11写道:
>
>> Hi team,
>>
>> I have one operator that is connected to another 9 downstream
>> operators using rebalance. Each operator has 150 parallelisms[1]. I 
>> assume
>> each message in the upstream operation is sent to one of the parallel
>> instances of the 9 receiving operators so the total bytes sent should be
>> roughly 9 times of bytes received in the upstream operator metric. 
>> However
>> the Flink UI shows the bytes sent is much higher than 9 times. It is 
>> about
>> 150 * 9 * bytes received[2]. This looks to me like every message is
>> duplicated to each parallel instance of all receiving operators like what
>> broadcast does.  Is this correct?
>>
>>
>>
>> [1] https://imgur.com/cGyb0QO
>> [2] https://imgur.com/SFqPiJA
>> --
>> Regards,
>> Tao
>>
>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


Re: Read parquet data from S3 with Flink 1.12

2021-12-17 Thread Piotr Nowojski
Hi,

Reading in the DataStream API (that's what I'm using you are doing) from
Parquet files is officially supported and documented only since 1.14 [1].
Before that it was only supported for the Table API. As far as I can tell,
the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
already been in the code base since 1.12.x. I don't know how stable it was
and how well it was working. I would suggest upgrading to Flink 1.14.1. As
a last resort you can try using the very least the latest version of 1.12.x
branch as documented by 1.14 version, but I can not guarantee that it will
be working.

Regarding the S3 issue, have you followed the documentation? [2][3]

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html


pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> napisał(a):

> Hello everyone,
> I am struggling to read S3 parquet files from S3 with Flink Streaming
> 1.12.2
> I had some difficulty simply reading from local parquet files. I finally
> managed that part, though the solution feels dirty:
> - I use the readFile function + ParquetInputFormat abstract class (that is
> protected) (as I could not find a way to use the public
> ParquetRowInputFormat).
> - the open function, in ParquetInputFormat is
> using org.apache.hadoop.conf.Configuration. I am not sure which import to
> add. It seems the flink-parquet library is importing the dependency from
> hadoop-common but the dep is marked as provided. THe doc only shows usage
> of flink-parquet from Flink SQL. So I am under the impression that this
> might not work in the streaming case without extra code. I 'solved' this by
> adding a dependency to hadoop-common. We did something similar to write
> parquet data to S3.
>
> Now, when trying to run the application to read from S3, I get an
> exception with root cause:
> ```
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> ```
> I guess there are some issues with hadoop-common not knowing about the
> flink-s3-hadoop plugin setup. But I ran out of ideas on how to solve this.
>
>
> I also noticed there were some changes with flink-parquet in Flink 1.14,
> but I had some issues with simply reading data (but I did not investigate
> so deeply for that version).
>
> Many thanks for any help.
> --
>
> [image: Grab] 
>
> [image: Twitter]   [image: Facebook]
>  [image: LinkedIn]
>  [image: Instagram]
>  [image: Youtube]
> 
>
> Alexandre Montecucco / Grab, Software Developer
> alexandre.montecu...@grab.com  / 8782 0937
>
> Grab
> 138 Cecil Street, Cecil Court #01-01Singapore 069538
> https://www.grab.com/ 
>
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to the processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email Please notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


Re: fraud detection example fails

2021-12-17 Thread Piotr Nowojski
Hi,

It might be simply because the binary artifacts are not yet
published/visible. The blog post [1] mentions that it should be visible
within 24h from yesterday), so please try again later/tomorrow. This is
also mentioned in the dev mailing list thread [2]

Best,
Piotrek

[1] https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
[2]
https://mail-archives.apache.org/mod_mbox/flink-dev/202112.mbox/%3C7ce89912-69fb-cf99-f815-10b87cece03b%40apache.org%3E

pt., 17 gru 2021 o 13:04 HG  napisał(a):

> Hello all
>
> I am a flink newbie and trying to do the fraud detection example.
> The maven command however fails for version 1.14.2 since it cannot find 
> flink-walkthrough-datastream-java
> for that version
>
> mvn archetype:generate -DarchetypeGroupId=org.apache.flink
> -DarchetypeArtifactId=flink-walkthrough-datastream-java
> -DarchetypeVersion=1.14.2 -DgroupId=frauddetection
> -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport
> -DinteractiveMode=false
>
> this succeeds however.
>
> mvn archetype:generate -DarchetypeGroupId=org.apache.flink
> -DarchetypeArtifactId=flink-walkthrough-datastream-java
> -DarchetypeVersion=1.14.1 -DgroupId=frauddetection
> -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport
> -DinteractiveMode=false
>
> Perhaps an omission caused by the need to fix the log4j issues?
>
> Can it be solved?
>
> Regards Hans-Peter
>


Re: Prometheus labels in metrics / counters

2021-12-17 Thread Piotr Nowojski
Hi,

In principle you can register metric/metric groups dynamically it should be
working just fine. However your code probably won't work, because per every
record you are creating a new group and new counter, that most likely will
be colliding with an old one. So every time you are defining a new group or
new counter, you should remember it in some field.

Best,
Piotrek

pt., 17 gru 2021 o 14:03 Witold Baryluk 
napisał(a):

> Hi,
>
> I want to track and increment some monitoring counters from `map`,
> but have them broken down by dynamically defined values of a label.
> The set of values is unknown at creation time, but it is bounded
> (less than 100 different values during a 30 day period, usually ~5).
>
> There is no easy way of doing this, compared to other Prometheus
> native systems (i.e. in Go, or C++), but it looks like it might be possible
> using some workarounds:
>
>
> public class MyMapper extends RichMapFunction {
>   private transient MetricGroup metric_group;
>
>   @Override
>   public void open(Configuration config) {
> this.metric_group = getRuntimeContext().getMetricGroup();
>   }
>
>   @Override
>   public String map(String value) throws Exception {
>   Group group = this.metric_group.addGroup("kind", getKind(value));
>   group.counter("latency_sum").inc(getLatencyMicros(value));
>   group.counter("latency_count").inc();
>   return value;
>   }
> }
>
> Will this work? Is there a better way?
>
> But, this does not look nice at all compared to how
> other projects handle Prometheus labels. Flink metrics are not well
> mapped into Prometheus metrics here.
>
> Second question, is it ok to call addGroup and counter, dynamically like
> this,
> or should it be cached? Do I need any such cache (which would be map
> of string to Counter), be protected by some mutex when I lookup or add
> to it?
>
> Cheers,
> Witold
>


Re: unaligned checkpoint for job with large start delay

2021-12-20 Thread Piotr Nowojski
Hi Mason,

Those checkpoint timeouts (30 minutes) have you already observed with the
alignment timeout set to 0ms? Or as you were previously running it with 1s
alignment timeout?

If the latter, it might be because unaligned checkpoints are failing to
kick in in the first place. Setting the timeout to 0ms should solve the
problem.

If the former, have you checked why the checkpoints are timeouting? What
part of the checkpointing process is taking a long time? For example can
you post a screenshot from the WebUI of checkpoint stats for each task? The
only explanation I could think of is this sleep time that you added. 25ms
per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record
= 72 000 records. One of the unaligned checkpoints limitations is that
Flink can not snapshot a state of an operator in the middle of processing a
record. In your particular case, Flink will not be able to snapshot the
state of the session window operator in the middle of the windows being
fired. If your window operator is firing a lot of windows at the same time,
or a single window is producing 72k of records (which would be an
unusual but not unimaginable amount), this could block checkpointing of the
window operator for 30 minutes due to this 25ms sleep down the stream.

Piotrek

pt., 17 gru 2021 o 19:19 Mason Chen  napisał(a):

> Hi Piotr,
>
> Thanks for the link to the JIRA ticket, we actually don’t see much state
> size overhead between checkpoints in aligned vs unaligned, so we will go
> with your recommendation of using unaligned checkpoints with 0s alignment
> timeout.
>
> For context, we are testing unaligned checkpoints with our application
> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
> -> [various kafka sinks]. The first task has parallelism 40 and the rest of
> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>
> We added an artificial sleep (25 ms per invocation of in process function)
> the session window task to simulate backpressure; however, we still see
> checkpoints failing due to task acknowledgement doesn’t complete within our
> checkpoint timeout (30 minutes).
>
> I am able to correlate that the input buffers from *window* and output
> buffers from *source* being 100% usage corresponds to the checkpoint
> failures. When they are not full (input can drop to as low as 60% usage and
> output can drop to as low as 55% usage), the checkpoints succeed within
> less than 2 ms. In all cases, it is the session window task or source task
> failing to 100% acknowledge the barriers within timeout. I do see the
> *source* task acknowledgement taking long in some of the failures (e.g.
> 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and
> not busy at this time.
>
> All other input buffers are low usage (mostly 0). For output buffer, the
> usage is around 50% for window--everything else is near 0% all the time
> except the source mentioned before (makes sense since rest are just sinks).
>
> We are also running a parallel Flink job with the same configurations,
> except with unaligned checkpoints disabled. Here we see observe the same
> behavior except now some of the checkpoints are failing due to the source
> task not acknowledging everything within timeout—however, most failures are
> still due to session window acknowledgement.
>
> All the data seems to points an issue with the source? Now, I don’t know
> how to explain this behavior since unaligned checkpoints should overtake
> records in the buffers (once seen at the input buffer, forward immediately
> downstream to output buffer).
>
> Just to confirm, this is our checkpoint configuration:
> ```
> Option
> Value
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend EmbeddedRocksDBStateBackend
> Interval 5m 0s
> Timeout 30m 0s
> Minimum Pause Between Checkpoints 2m 0s
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Enabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 10
> ```
>
> Are there other metrics should I look at—why else should tasks fail
> acknowledgement in unaligned mode? Is it something about the implementation
> details of window function that I am not considering? My main hunch is
> something to do with the source.
>
> Best,
> Mason
>
> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski  wrote:
>
> Hi Mason,
>
> In Flink 1.14 we have also changed the timeout behavior from checking
> against the alignment duration, to simply checking how old is the
> checkpoint barrier (so it would also account for the start delay) [1]. It
> was done in order to solve problems as you are describing. Unfortunately we
> can not backport this change to 

Re: unaligned checkpoint for job with large start delay

2022-01-10 Thread Piotr Nowojski
Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has
been blocked being "busy" processing for example timers. FLINK-25414 shows
only blocked on being hard/soft backpressure. Unfortunately at the moment I
don't know how to implement such a metric without affecting performance on
the critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen  napisał(a):

> Hi Piotrek,
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
> Best,
> Mason
>
> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski  wrote:
>
> Hi Mason,
>
> > and it has to finish processing this output before checkpoint can
> begin—is this right?
>
> Yes. Checkpoint will be only executed once all triggered windows will be
> fully processed.
>
> But from what you have posted it looks like all of that delay is
> coming from hundreds of thousands of windows firing all at the same time.
> Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s
> * 60s/min * 70min = 151 200triggers fired at once (or in a very short
> interval). In other words, something (presumably a watermark) has fired
> more than 151 200 windows at once, which is taking ~1h 10minutes to process
> and during this time the checkpoint can not make any progress. Is this
> number of triggered windows plausible in your scenario?
>
> Best,
> Piotrek
>
>
> czw., 23 gru 2021 o 12:12 Mason Chen  napisał(a):
>
>> Hi Piotr,
>>
>> Thanks for the thorough response and the PR—will review later.
>>
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s *window process function* emits at
>> least 1 record.
>> 3. The 25 ms sleep is at the beginning of the window process function.
>>
>> Your explanation about how records being bigger than the buffer size can
>> cause blockage makes sense to me. However, my average record size is around 
>> 770
>> bytes coming out of the source and 960 bytes coming out of the window.
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My
>> Flink job memory config is as follows:
>>
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>> taskmanager.memory.network.fraction: '0.2'
>> taskmanager.memory.network.max: 2Gb
>> taskmanager.memory.network.min: 200Mb
>> taskmanager.memory.process.size: 16Gb
>> taskmanager.numberOfTaskSlots: '4'
>> ```
>>
>>  Are you sure your job is making any progress? Are records being
>> processed? Hasn't your job simply deadlocked on something?
>>
>>
>> To distinguish task blockage vs graceful backpressure, I have checked the
>> operator throughput metrics and have confirmed that during window *task*
>> buffer blockage, the window *operator* DOES emit records. Tasks look
>> like they aren’t doing anything but the window is emitting records.
>>
>> 
>>
>>
>> Furthermore, I created a custom trigger to wrap a metric counter for
>> FIRED counts to get a estimation of how many windows are fired at the same
>> time. I ran a separate job with the same configs—the results look as
>> follows:
>> 
>>
>> On average, when the buffers are blocked, there are 36 FIREs per second.
>> Since each of these fires invokes the window process function, 25 ms * 36 =
>> 900 ms means we sleep almost a second cumulatively, per second—which is
>> pretty severe. Combined with the fact that the window process function can
>> emit many records, the task takes even longer to checkpoint since the
>> flatmap/kafka sink is chained with the window operator—and it has to finish
>> processing this o

Re: Job stuck in savePoint - entire topic replayed on restart.

2022-01-10 Thread Piotr Nowojski
Hi Basil,

1. What do you mean by:
> The only way we could stop these stuck jobs was to patch the finalizers.
?
2. Do you mean that your job is stuck when doing stop-with-savepoint?
3. What Flink version are you using? Have you tried upgrading to the most
recent version, or at least the most recent minor release? There have been
some bugs in the past with stop-with-savepoint, that have been fixed over
time. For example [1], [2] or [3]. Note that some of them might not be
related to your use case (Kinesis consumer or FLIP-27 sources).
4. If upgrading won't help, can you post stack traces of task managers that
contain the stuck operators/tasks?
5. If you are working on a version that has fixed all of those bugs, are
you using some custom operators/sources/sinks? If your code is either
capturing interrupts, or doing some blocking calls, it might be prone to
bugs similar to [2] (please check the discussion in the ticket for more
information).

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-21028
[2] https://issues.apache.org/jira/browse/FLINK-17170
[3] https://issues.apache.org/jira/browse/FLINK-21133

czw., 6 sty 2022 o 16:23 Basil Bibi  napisał(a):

> Hi,
> We experienced a problem in production during a release.
> Our application is deployed to kubernetes using argocd and uses the Lyft
> flink operator.
> We tried to do a release and found that on deleting the application some
> of the jobs became stuck in "savepointing" phase.
> The only way we could stop these stuck jobs was to patch the finalizers.
> We deployed the new release and on startup our application had lost it's
> offsets so all of the messages in kafka were replayed.
> Has anyone got any ideas how and why this happened and how we avoid it in
> the future?
> Sincerely Basil Bibi
>
>
>
> Authorised and regulated by the Financial Conduct Authority
>  (FCA) number 923700. *Humn.ai Ltd*, 12
> Hammersmith Grove, London, W6 7AP is a registered company number 11032616
> incorporated in the United Kingdom. Registered with the information
> commissioner’s office (ICO) number ZA504331.
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you aren't the named
> addressee, you should not disseminate, distribute, or copy this e-mail.
>


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Hi Sid,

I don't see on the stackoverflow explanation of what are you trying to do
here (no mentions of MapFunction or a tuple).

If you want to create a `DataStream` from some a pre
existing/static Tuple of Strings, the easiest thing would be to convert the
tuple to a collection/iterator and use
`StreamExecutionEnvironment#fromCollection(...)`.
If you already have a `DataStream>` (for example your source
produces a tuple) and you want to flatten it to `DataStream`, then
you need a simple `FlatMapFunction, String>` (or
`RichFlatMapFunction, String>`), that would do the flattening
via:

public void flatMap(Tuple value, Collector out) throws
Exception {
  out.collect(value.f0);
  out.collect(value.f1);
  ...;
  out.collect(value.fN);
}

Best,
Piotrek

pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
napisał(a):

> Hi Francis,
>
> What I am trying to do is you can see over here
> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>
>
> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi Siddhesh,
>>
>> How are you getting this tuple of strings into the system? I think this
>> is the important question, you can create a DataStream in many ways, from a
>> collection, from a source, etc but all of these rely on the
>> ExecutionEnvironment you're using.
>> A RichMapFunction doesn't produce a datastream directly, it's used in the
>> context of the StreamExecutionEnvironment to create a stream i.e.
>> DataStream.map([YourRichMapFunction]) this implies that you already need a
>> datastream to transform a datastream using a mapFunction
>> (MapFunction/RichMapFunction)
>> Francis
>>
>> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> As I am new and I am facing one issue so I came across RichMapFunction.
>>> How can I use RichMapFunction to convert a tuple of strings to datastream?
>>> If not how can I do it apart from using StreamExecutionEnvironment?
>>>
>>> Thanks,
>>> Sid
>>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>


Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Hi Clayton,

I think in principle this example should be still valid, however instead of
providing a `CustomFlinkKafkaConsumer` and overriding it's `open` method,
you would probably need to override
`org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`.
So you would most likely need both at the very least a custom
`KafkaSourceReader` and `KafkaSource` to instantiate your custom
`KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far.

Best,
Piotrek

pt., 7 sty 2022 o 21:18 Clayton Wohl  napisał(a):

> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
> latter support this:
>
>
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html
>
> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
> Flink environment. I don't have reliable access to the local file system.
> At the documentation link above, Amazon recommends adding a hook to copy
> the keystore files from the classpath to a /tmp directory at runtime. Can
> KafkaSource do something similar?
>


Re: Uploading jar to s3 for persistence

2022-01-10 Thread Piotr Nowojski
Hi Puneet,

Have you seen this thread before? [1]. It looks like the same issue and
especially this part might be the key:

> Be aware that the filesystem used by the FileUploadHandler
> is java.nio.file.FileSystem and not
> Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> FileSystem implementations.

Best,
Piotrek

[1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html



pon., 10 sty 2022 o 08:19 Puneet Duggal 
napisał(a):

> Hi,
>
> Currently i am working with flink HA cluster with 3 job managers and 3
> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
> already configured required flink-s3 jars during flink job manager and task
> manager process startup. Now i have configured a variable
>
> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>
> Expectation is that jar upload via rest apis will be uploaded to this
> location and hence is accessible to all 3 job managers (which eventually
> will help in job submission as all 3 job managers will have record of
> uploaded jar to this location). But while uploading the jar, I am facing
> following Illegal Argument Exception which i am not sure why. Also above
> provided s3 location was created before job manager process was even
> started.
>
> *2022-01-09 18:12:46,790 WARN
>  org.apache.flink.runtime.rest.FileUploadHandler  [] - File
> upload failed.*
> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>
>
>
>


Re: Is there a way to know how long a Flink app takes to finish resuming from Savepoint?

2022-01-10 Thread Piotr Nowojski
Hi,

Unfortunately there is no such metric. Regarding the logs, I'm not sure
what Flink version you are using, but since Flink 1.13.0 [1][2], you could
relay on the tasks/subtasks switch from `INITIALIZING` to `RUNNING` to
check when the task/subtask has finished recovering it's state.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-17012
[2] https://issues.apache.org/jira/browse/FLINK-22215

pon., 10 sty 2022 o 09:34 Chen-Che Huang  napisał(a):

> Hi all,
>
> I'm trying to speed up the process of resuming from a savepoint by
> adjusting some configuration.
> I wonder whether there exists a way to know how much time our Flink app
> spends resuming from a savepoint?
> From the logs, I can see only the starting time of the resuming (as shown
> below) but couldn't find the end time of the resuming.
> If there exists some metrics or information about the resuming time, it'd
> be very helpful for the tuning.
> Any comment is appreciated.
>
> timestamp-1: Starting job  from savepoint
> timestamp-2: Restoring job  from Savepoint
>
> Best wishes,
> Chen-Che Huang
>


Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Ah, I see. Pitty. You could always use reflection if you really had to, but
that's of course not a long term solution.

I will raise this issue to the KafkaSource/AWS contributors.

Best,
Piotr Nowojski

pon., 10 sty 2022 o 16:55 Clayton Wohl  napisał(a):

> Custom code can create subclasses of FlinkKafkaConsumer, because the
> constructors are public. Custom code can't create subclasses of KafkaSource
> because the constructors are package private. So the same solution of
> creating code subclasses won't work for KafkaSource.
>
> Thank you for the response :)
>
>
> On Mon, Jan 10, 2022 at 6:22 AM Piotr Nowojski 
> wrote:
>
>> Hi Clayton,
>>
>> I think in principle this example should be still valid, however instead
>> of providing a `CustomFlinkKafkaConsumer` and overriding it's `open`
>> method, you would probably need to override
>> `org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`.
>> So you would most likely need both at the very least a custom
>> `KafkaSourceReader` and `KafkaSource` to instantiate your custom
>> `KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far.
>>
>> Best,
>> Piotrek
>>
>> pt., 7 sty 2022 o 21:18 Clayton Wohl  napisał(a):
>>
>>> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
>>> latter support this:
>>>
>>>
>>> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html
>>>
>>> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
>>> Flink environment. I don't have reliable access to the local file system.
>>> At the documentation link above, Amazon recommends adding a hook to copy
>>> the keystore files from the classpath to a /tmp directory at runtime. Can
>>> KafkaSource do something similar?
>>>
>>


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Glad to hear it.

Best,
Piotrek

pon., 10 sty 2022 o 20:08 Siddhesh Kalgaonkar 
napisał(a):

> Hi Piotr,
>
> Thanks for the reply. I was looking for how to create a DataStream under a
> process function since using that I had to call something else but I came
> across one of Fabian's posts where he mentioned that this way of creating
> DS is not "encouraged and tested". So, I figured out an alternate way of
> using side output and now I can do what I was aiming for.
>
> Thanks,
> Sid.
>
> On Mon, Jan 10, 2022 at 5:29 PM Piotr Nowojski 
> wrote:
>
>> Hi Sid,
>>
>> I don't see on the stackoverflow explanation of what are you trying to do
>> here (no mentions of MapFunction or a tuple).
>>
>> If you want to create a `DataStream` from some a pre
>> existing/static Tuple of Strings, the easiest thing would be to convert the
>> tuple to a collection/iterator and use
>> `StreamExecutionEnvironment#fromCollection(...)`.
>> If you already have a `DataStream>` (for example your
>> source produces a tuple) and you want to flatten it to
>> `DataStream`, then you need a simple
>> `FlatMapFunction, String>` (or
>> `RichFlatMapFunction, String>`), that would do the flattening
>> via:
>>
>> public void flatMap(Tuple value, Collector out) throws
>> Exception {
>>   out.collect(value.f0);
>>   out.collect(value.f1);
>>   ...;
>>   out.collect(value.fN);
>> }
>>
>> Best,
>> Piotrek
>>
>> pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
>> napisał(a):
>>
>>> Hi Francis,
>>>
>>> What I am trying to do is you can see over here
>>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>>>
>>>
>>> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
>>> francis.con...@switchdin.com> wrote:
>>>
>>>> Hi Siddhesh,
>>>>
>>>> How are you getting this tuple of strings into the system? I think this
>>>> is the important question, you can create a DataStream in many ways, from a
>>>> collection, from a source, etc but all of these rely on the
>>>> ExecutionEnvironment you're using.
>>>> A RichMapFunction doesn't produce a datastream directly, it's used in
>>>> the context of the StreamExecutionEnvironment to create a stream i.e.
>>>> DataStream.map([YourRichMapFunction]) this implies that you already need a
>>>> datastream to transform a datastream using a mapFunction
>>>> (MapFunction/RichMapFunction)
>>>> Francis
>>>>
>>>> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As I am new and I am facing one issue so I came
>>>>> across RichMapFunction. How can I use RichMapFunction to convert a tuple 
>>>>> of
>>>>> strings to datastream? If not how can I do it apart from using
>>>>> StreamExecutionEnvironment?
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>
>>>> This email and any attachments are proprietary and confidential and are
>>>> intended solely for the use of the individual to whom it is addressed. Any
>>>> views or opinions expressed are solely those of the author and do not
>>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>>> received this email in error, please let us know immediately by reply email
>>>> and delete it from your system. You may not use, disseminate, distribute or
>>>> copy this message nor disclose its contents to anyone.
>>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>>> Australia
>>>>
>>>


Re: unaligned checkpoint for job with large start delay

2022-01-11 Thread Piotr Nowojski
Hi Thias and Mason,

> state-backend-rocksdb-metrics-estimate-num-keys

Indeed that can be a good indicator. However keep in mind that, depending
on your logic, there might be many existing windows for each key.

>  However, it’s not so clear how to count the windows that have been
registered since the window assigner does not expose the run time
context—is this even the right place to count?

Yes, I think you are unfortunately right. I've looked at the code, and it
wouldn't be even that easy to add such a metric. Sorry for misleading you.
But a spike in triggered windows is astrong indication that they were
triggered all at once.

> Perhaps, it can be an opt in feature? I do it see it being really useful
since most users aren’t really familiar with windows and these metrics can
help easily identify the common problem of too many windows firing.
> The additional metrics certainly help in diagnosing some of the symptoms
of the root problem.

I will think about how to solve it. I would be against an opt in metric, as
it would complicate code and configuration for the users while barely
anyone would use it.

Note that huge checkpoint start delay with unaligned checkpoints already
confirms that the system has been blocked by something. As I mentioned
before, there are a number of reasons why: record size larger than buffer
size, flatMap functions/operators multiplying number of records, large
number of timers fired at once. Summing up everything that you have
reported so far, we ruled out the former two options, and spike in the
number of triggered windows almost confirms that this is the issue at hand.

Best,
Piotrek

śr., 12 sty 2022 o 08:32 Schwalbe Matthias 
napisał(a):

> Hi Mason,
>
>
>
> Since you are using RocksDB, you could enable this metric [1]
> state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good
> indication of the number of active windows.
>
> I’ve never seen (despite the warning) negative effect on the runtime.
>
>
>
> Hope this help …
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys
>
>
>
> *From:* Mason Chen 
> *Sent:* Dienstag, 11. Januar 2022 19:20
> *To:* Piotr Nowojski 
> *Cc:* Mason Chen ; user 
> *Subject:* Re: unaligned checkpoint for job with large start delay
>
>
>
> Hi Piotrek,
>
>
>
> No worries—I hope you had a good break.
>
>
>
> Counting how many windows have been registered/fired and plotting that
> over time.
>
> It’s straightforward to count windows that are fired (the trigger exposes
> the run time context and we can collect the information in that code path).
> However, it’s not so clear how to count the windows that have been
> registered since the window assigner does not expose the run time
> context—is this even the right place to count? It’s not necessarily the
> case that an assignment results in a new window registered. Am I missing
> anything else relevant from the user facing interface perspective?
>
>
>
>  Unfortunately at the moment I don't know how to implement such a metric
> without affecting performance on the critical path, so I don't see this
> happening soon :(
>
> Perhaps, it can be an opt in feature? I do it see it being really useful
> since most users aren’t really familiar with windows and these metrics can
> help easily identify the common problem of too many windows firing.
>
>
>
> The additional metrics certainly help in diagnosing some of the symptoms
> of the root problem.
>
>
>
> Best,
>
> Mason
>
>
>
> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski  wrote:
>
>
>
> Hi Mason,
>
>
>
> Sorry for a late reply, but I was OoO.
>
>
>
> I think you could confirm it with more custom metrics. Counting how many
> windows have been registered/fired and plotting that over time.
>
>
>
> I think it would be more helpful in this case to check how long a task has
> been blocked being "busy" processing for example timers. FLINK-25414 shows
> only blocked on being hard/soft backpressure. Unfortunately at the moment I
> don't know how to implement such a metric without affecting performance on
> the critical path, so I don't see this happening soon :(
>
>
>
> Best,
>
> Piotrek
>
>
>
> wt., 4 sty 2022 o 18:02 Mason Chen  napisał(a):
>
> Hi Piotrek,
>
>
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenar

Re: Buffering when connecting streams

2022-01-18 Thread Piotr Nowojski
Hi Alexis,

I believe you should be able to use the `ConnectedStreams#transform()`
method.

Best, Piotrek

wt., 18 sty 2022 o 14:20 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Hi again everyone,
>
>
>
> It’s been a while, so first of all happy new year :)
>
>
>
> I was revisiting this discussion and started looking at the code. However,
> it seems that all of the overloads of ConnectedStreams#process expect a
> CoProcessFunction or the Keyed counterpart, so I don’t think I can inject a
> custom TwoInputStreamOperator.
>
>
>
> After a quick glance at the joining documentation, I wonder if I could
> accomplish what I want with a window/interval join of streams. If so, I
> might be able to avoid using state in the join function, but if I can’t
> avoid it, is it possible to use managed state in a (Process)JoinFunction?
> The join needs keys, but I don’t know if the resulting stream counts as
> keyed from the state’s point of view.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 6. Dezember 2021 08:43
> *To:* David Morávek 
> *Cc:* Alexis Sarda-Espinosa ;
> user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis and David,
>
>
>
> This actually can not happen. There are mechanisms in the code to make
> sure none of the input is starved IF there is some data to be read.
>
>
>
> The only time when input can be blocked is during the alignment phase of
> aligned checkpoints under back pressure. If there was a back pressure in
> your job it could have easily happened that checkpoint barriers would flow
> through the job graph to the CoProcessKeyedCoProcessFunction on one of the
> paths much quicker then the other, causing this faster path to be blocked
> until the other side catched up. But that would happen only during the
> alignment phase of the checkpoint, so without a backpressure for a very
> short period of time.
>
>
>
> Piotrek
>
>
>
> czw., 2 gru 2021 o 18:23 David Morávek  napisał(a):
>
> I think this could happen, but I have a very limited knowledge about how
> the input gates work internally. @Piotr could definitely provide some more
> insight here.
>
>
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> I do have some logic with timers today, but it’s indeed not ideal. I guess
> I’ll have a look at TwoInputStreamOperator, but I do have related
> questions. You mentioned a sample scenario of "processing backlog" where
> windows fire very quickly; could it happen that, in such a situation, the
> framework calls the operator’s processElement1 continuously (even for
> several minutes) before calling processElement2 a single time? How does the
> framework decide when to switch the stream processing when the streams are
> connected?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 17:18
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Even with the TwoInputStreamOperator you can not "halt" the processing.
> You need to buffer these elements for example in the ListState for later
> processing. At the time the watermark of the second stream arrives, you can
> process all buffered elements that satisfy the condition.
>
>
>
> You could probably also implement a similar (less optimized) solution with
> KeyedCoProcessFunction using event time timers.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t
> tell Flink to "halt" processElement1 and switch to the other stream
> depending on watermarks. I could look into TwoInputStreamOperator if you
> think that’s the best approach.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 16:59
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> I think this would require using lower level API and implementing a custom
> `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
> methods.
>
>
>
> Let's also make sure we're on the same page on what the watermark is. You
> can think of the watermark as event time clock. It basically gives you an
> information, that *no more events with timestamp lo

Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Piotr Nowojski
Hi,

Unfortunately the new KafkaSource was contributed without good benchmarks,
and so far you are the first one that noticed and reported this issue.
Without more direct comparison (as Martijn suggested), it's hard for us to
help right away. It would be a tremendous help for us if you could for
example provide us steps to reproduce this exact issue? Another thing that
you could do, is to attach some code profiler to both Flink 1.9 and 1.14
versions and compare the results of source task threads from both (Flink
task threads are named after the task name, so they are easy to
distinguish).

Also have you observed some degradation in metrics reported by Flink? Like
the records processing rate between those two versions?

Best,
Piotrek

śr., 16 lut 2022 o 13:24 Arujit Pradhan 
napisał(a):

> Hey Martijn,
>
> Thanks a lot for getting back to us. To give you a little bit more
> context, we do maintain an open-source project around flink dagger
>  which is a wrapper for proto processing.
> As part of the upgrade to the latest version, we did some refactoring and
> moved to KafkaSource since the older FlinkKafkaConsumer was getting
> deprecated.
>
> So we currently do not have any set up to test the hypothesis. Also just
> increasing the resources by a bit fixes it and it does happen with a small
> set of jobs during high traffic.
>
> We would love to get some input from the community as it might cause
> errors in some of the jobs in production.
>
> Thanks and regards,
> //arujit
>
> On Tue, Feb 15, 2022 at 8:48 PM Martijn Visser 
> wrote:
>
>> Hi Arujit,
>>
>> I'm also looping in some contributors from the connector and runtime
>> perspective in this thread. Did you also test the upgrade first by only
>> upgrading to Flink 1.14 and keeping the FlinkKafkaConsumer? That would
>> offer a better way to determine if a regression is caused by the upgrade of
>> Flink or because of the change in connector.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>>
>> On Tue, 15 Feb 2022 at 13:07, Arujit Pradhan 
>> wrote:
>>
>>> Hey team,
>>>
>>> We are migrating our Flink codebase and a bunch of jobs from Flink-1.9
>>> to Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs
>>> for a week both in 1.9 and 1.14 simultaneously with the same resources and
>>> configurations and monitored them.
>>>
>>> Though most of the jobs are running fine, we have significant
>>> performance degradation in some of the high throughput jobs during peak
>>> hours. As a result, we can see high lag and data drops while processing
>>> messages from Kafka in some of the jobs in 1.14 while in 1.9 they are
>>> working just fine.
>>> Now we are debugging and trying to understand the potential reason for
>>> it.
>>>
>>> One of the hypotheses that we can think of is the change in the sequence
>>> of processing in the source-operator. To explain this, adding screenshots
>>> for the problematic tasks below.
>>> The first one is for 1.14 and the second is for 1.9. Upon inspection, it
>>> can be seen the sequence of processing 1.14 is -
>>>
>>> data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.
>>>
>>> While in 1.9 it was,
>>>
>>> data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.
>>>
>>> In 1.14 we are using KafkaSource API while in the older version it was
>>> FlinkKafkaConsumer API. Wanted to understand if it can cause potential
>>> performance decline as all other configurations/resources for both of the
>>> jobs are identical and if so then how to avoid it. Also, we can not see any
>>> unusual behaviour for the CPU/Memory while monitoring the affected jobs.
>>>
>>> Source Operator in 1.14 :
>>> [image: image.png]
>>> Source Operator in 1.9 :
>>> [image: image.png]
>>> Thanks in advance,
>>> //arujit
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: Job manager slots are in bad state.

2022-02-16 Thread Piotr Nowojski
Hi Josson,

Would you be able to reproduce this issue on a more recent version of
Flink? I'm afraid that we won't be able to help with this issue as this
affects a Flink version that is not supported for quite some time and
moreover `SlotSharingManager` has been completed removed in Flink 1.13.

Can you upgrade to a more recent Flink version and try it out? I would
assume the bug should be gone in 1.13.x or 1.14.x branches. If not, you can
also try out Flink 1.11.4, as maybe it has fixed this issue as well.

Best,
Piotrek

śr., 16 lut 2022 o 08:16 Josson Paul  napisał(a):

> We are using Flink version 1.11.2.
> At times if task managers are restarted for some reason, the job managers
> throw the exception that I attached here. It is an illegal state exception.
> We never had this issue with Flink 1.8. It started happening after
> upgrading to Flink 1.11.2.
>
> Why are the slots not released if it is in a bad state?. The issue doesn't
> get resolved even if I restart all the task managers. It will get resolved
> only if I restart Job manager.
>
> java.util.concurrent.CompletionException: java.util.concurrent.
> CompletionException: java.lang.IllegalStateException
> at org.apache.flink.runtime.jobmaster.slotpool.
> SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433)
> at java.base/java.util.concurrent.CompletableFuture.uniHandle(
> CompletableFuture.java:930)
> at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(
> CompletableFuture.java:907)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:506)
> at java.base/java.util.concurrent.CompletableFuture
> .completeExceptionally(CompletableFuture.java:2088)
> at org.apache.flink.runtime.concurrent.FutureUtils
> .lambda$forwardTo$21(FutureUtils.java:1132)
> at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:859)
> at java.base/java.util.concurrent.CompletableFuture
> .uniWhenCompleteStage(CompletableFuture.java:883)
> at java.base/java.util.concurrent.CompletableFuture.whenComplete(
> CompletableFuture.java:2251)
> at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils
> .java:1100)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager
> .createRootSlot(SlotSharingManager.java:155)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateMultiTaskSlot(SchedulerImpl.java:477)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSharedSlot(SchedulerImpl.java:311)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .internalAllocateSlot(SchedulerImpl.java:160)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlotInternal(SchedulerImpl.java:143)
> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
> .allocateSlot(SchedulerImpl.java:113)
> at org.apache.flink.runtime.executiongraph.
> SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(
> SlotProviderStrategy.java:115)
> at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
> .lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104)
> at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(
> CompletableFuture.java:1106)
> at java.base/java.util.concurrent.CompletableFuture.thenCompose(
> CompletableFuture.java:2235)
> at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator
> .allocateSlotsFor(DefaultExecutionSlotAllocator.java:102)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(
> DefaultScheduler.java:339)
> at org.apache.flink.runtime.scheduler.DefaultScheduler
> .allocateSlotsAndDeploy(DefaultScheduler.java:312)
> at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
> .allocateSlotsAndDeploy(EagerSchedulingStrategy.java:76)
> at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy
> .restartTasks(EagerSchedulingStrategy.java:57)
> at org.apache.flink.runtime.scheduler.DefaultScheduler
> .lambda$restartTasks$2(DefaultScheduler.java:265)
> at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(
> CompletableFuture.java:783)
> at java.base/java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:478)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
> AkkaRpcActor.java:402)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:195)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.ja

Re: Python Function for Datastream Transformation in Flink Java Job

2022-02-16 Thread Piotr Nowojski
Hi,

As far as I can tell the answer is unfortunately no. With Table API (SQL)
things are much simpler, as you have a restricted number of types of
columns that you need to support and you don't need to support arbitrary
Java classes as the records.

I'm shooting blindly here, but maybe you can use your Python UDF in Table
API and then convert a Table to DataStream? [1]

Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/

śr., 16 lut 2022 o 09:46 Jesry Pandawa 
napisał(a):

> Hello,
>
> Currently, Flink already supports adding Python UDF and using that on
> Flink Java job. It can be used on Table API. Can we do the same for
> creating custom python function for Datastream transformation and use that
> on Flink Java job?
>
> Regards,
>
> Jesry
>


Re: getting "original" ingestion timestamp after using a TimestampAssigner

2022-02-16 Thread Piotr Nowojski
Hi Frank,

I'm not sure exactly what you are trying to accomplish, but yes. In
the TimestampAssigner you can only return what should be the new timestamp
for the given record.

If you want to use "ingestion time" - "true even time"  as some kind of
delay metric, you will indeed need to have both of them calculated
somewhere. You could:
1. As you described, use first ingestion time assigner, a mapper function
to extract this to a separate field, re assign the true event time, and
calculate the delay
2. Or you could simply assign the correct event time and in a simple single
mapper, chained directly to the source, use for example
`System.currentTimeMillis() - eventTime` to calculate this delay in a
single step. After all, that's more or less what Flink is doing to
calculate the ingestion time [1]

Best, Piotrek

[1]
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IngestionTimeAssigner.java

śr., 16 lut 2022 o 09:46 Frank Dekervel  napisał(a):

> Hello,
>
> I'm getting messages from a kafka stream. The messages are JSON records
> with a "timestamp" key in the json. This timestamp key contains the time
> at which the message was generated. Now i'd like if these messages had a
> delivery delay (eg delay between message generation and arrival in
> kafka). So i don't want to have the "full" delay (eg difference between
> generation time and processing time), just de delivery delay.
>
> In my timestamp assigner i get a "long" with the original timestamp as
> an argument, but i cannot yield an updated record from the timestamp
> assigner (eg with an extra field "deliveryDelay" or so).
>
> So i guess my only option is to not specify the timestamp/watermark
> extractor in the env.fromSource, then first mapping the stream to add a
> lateness field and only after that reassign timestamps/watermarks ... is
> that right ?
>
> Thanks!
>
> Greetings,
> Frank
>
>
>
>


Re: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Piotr Nowojski
Hi James,

Sure! The basic idea of checkpoints is that they are fully owned by the
running job and used for failure recovery. Thus by default if you stopped
the job, checkpoints are being removed. If you want to stop a job and then
later resume working from the same point that it has previously stopped,
you most likely want to use savepoints [1]. You can stop the job with a
savepoint and later you can restart another job from that savepoint.

Regarding the externalised checkpoints. Technically you could use them in
the similar way, but there is no command like "take a checkpoint and stop
the job". Nevertheless you might consider enabling them as this allows you
to manually cancel the job if it enters an endless recovery/failure
loop, fix the underlying issue, and restart the job from the externalised
checkpoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/

śr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine 
napisał(a):

> Hi all,
>
> I have a 1.14 Flink streaming workflow with many stateful functions that
> has a FsStateBackend and checkpointed enabled, although I haven't set a
> location for the checkpointed state.
>
> I've really struggled to understand how I can stop my Flink job and
> restart it and ensure it carries off exactly where is left off by using the
> state or checkpoints or savepoints. This is not clearly explained in the
> book or the web documentation.
>
> Since I have no control over my Flink job id I assume I can not force
> Flink to pick up the state recorded under the jobId directory for the
> FsStateBackend. Therefore I *think*​ Flink should read back in the last
> checkpointed data but I don't understand how to force my program to read
> this in? Do I use retained checkpoints or not? How can I force my program
> either use the last checkpointed state (e.g. when running from my IDE,
> starting and stopping the program) or maybe force it *not *to read in the
> state and start completely fresh?
>
> The web documentation talks about bin/flink but I am running from my IDE
> so I want my Java code to control this progress using the Flink API in Java.
>
> Can anyone give me some basic pointers as I'm obviously missing something
> fundamental on how to allow my program to be stopped and started without
> losing all the state.
>
> Many thanks,
>
> James.
>
>


Re: Basic questions about resuming stateful Flink jobs

2022-02-17 Thread Piotr Nowojski
t adding to the Main method args parameter or adding them to the
> PipelineOptions once you build that object from args. I've never used the
> Flink libs, just the runner, but from [1] and [3] it looks like you can
> configure things in code if you prefer that.
>
> Hope it helps,
> Cristian
>
> [1] https://beam.apache.org/documentation/runners/flink/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/#configuration
>
>
> On Wed, Feb 16, 2022 at 12:28 PM Sandys-Lumsdaine, James <
> james.sandys-lumsda...@systematica.com> wrote:
>
>> Thanks for your reply, Piotr.
>>
>>
>>
>> Some follow on questions:
>>
>> >". Nevertheless you might consider enabling them as this allows you to
>> manually cancel the job if it enters an endless recovery/failure loop, fix
>> the underlying issue, and restart the job from the externalised checkpoint.
>>
>>
>>
>> How is this done? Are you saying the retained checkpoint (i.e. the last
>> checkpoint that isn’t deleted) can somehow be used when restarting the
>> Flink application? If I am running in my IDE and just using the local
>> streaming environment, how can I test my recovery code either with a
>> retained checkpoint? All my attempts so far just say “No checkpoint found
>> during restore.” Do I copy the checkpoint into a savepoint directory and
>> treat it like a savepoint?
>>
>>
>>
>> On the topic of savepoints, that web page [1] says I need to use
>> “bin/flink savepoint” or “bin/flink stop --savepointPath” – but again, if
>> I’m currently not running in a real cluster how else can I create and
>> recover from the save points?
>>
>>
>>
>> From what I’ve read there is state, checkpoints and save points – all of
>> them hold state - and currently I can’t get any of these to restore when
>> developing in an IDE and the program builds up all state from scratch. So
>> what else do I need to do in my Java code to tell Flink to load a savepoint?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> James.
>>
>>
>>
>>
>>
>> *From:* Piotr Nowojski 
>> *Sent:* 16 February 2022 16:36
>> *To:* James Sandys-Lumsdaine 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Basic questions about resuming stateful Flink jobs
>>
>>
>>
>> *CAUTION: External email. The email originated outside of our company *
>>
>> Hi James,
>>
>>
>>
>> Sure! The basic idea of checkpoints is that they are fully owned by the
>> running job and used for failure recovery. Thus by default if you stopped
>> the job, checkpoints are being removed. If you want to stop a job and then
>> later resume working from the same point that it has previously stopped,
>> you most likely want to use savepoints [1]. You can stop the job with a
>> savepoint and later you can restart another job from that savepoint.
>>
>>
>>
>> Regarding the externalised checkpoints. Technically you could use them in
>> the similar way, but there is no command like "take a checkpoint and stop
>> the job". Nevertheless you might consider enabling them as this allows you
>> to manually cancel the job if it enters an endless recovery/failure
>> loop, fix the underlying issue, and restart the job from the externalised
>> checkpoint.
>>
>>
>>
>> Best,
>>
>> Piotrek
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/
>>
>>
>>
>> śr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine 
>> napisał(a):
>>
>> Hi all,
>>
>>
>>
>> I have a 1.14 Flink streaming workflow with many stateful functions that
>> has a FsStateBackend and checkpointed enabled, although I haven't set a
>> location for the checkpointed state.
>>
>>
>>
>> I've really struggled to understand how I can stop my Flink job and
>> restart it and ensure it carries off exactly where is left off by using the
>> state or checkpoints or savepoints. This is not clearly explained in the
>> book or the web documentation.
>>
>>
>>
>> Since I have no control over my Flink job id I assume I can not force
>> Flink to pick up the state recorded under the jobId directory for the
>> FsStateBackend. Therefore I *think*​ Flink should read back in the last
>> checkpointed data but I don't underst

Re: Low Watermark

2022-02-25 Thread Piotr Nowojski
Hi,

It's the minimal watermark among all 10 parallel instances of that Task.

Using metric (currentInputWatermark) [1] you can access the watermark of
each of those 10 sub tasks individually.

Best,
Piotrek

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

pt., 25 lut 2022 o 14:10 Isidoros Ioannou  napisał(a):

> Hello, could someone please explain what the Low Watermark indicates in
> the Flink UI in the attached image?
> I have event time enabled with a boundOutOfOrdernessStrategy of 3s for the
> incoming events and I use CEP with a within window of 5 minutes.
>


Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-12-03 Thread Piotr Nowojski
Good to hear that :)

Duplicated “uncommitted” messages are normal and to be expected. After all 
that’s what `read_uncommitted` is for - to be able to read the messages without 
waiting until they are committed and thus even if their transactions was later 
aborted.

Piotrek

> On 1 Dec 2018, at 14:44, Nastaran Motavali  wrote:
> 
> Thanks for your helpful response,
> Setting the consumer's 'isolation.level' property to 'read_committed' solved 
> the problem!
> In fact, still there is some duplicated messages in the sink topic but they 
> are uncommitted and if a kafka consumer reads the messages from this sink, 
> the duplicated messages have not been read so everything is OK.
> 
> 
> 
> Kind regards,
> Nastaran Motavalli
> 
> 
> From: Piotr Nowojski 
> Sent: Thursday, November 29, 2018 3:38:38 PM
> To: Nastaran Motavali
> Cc: user@flink.apache.org
> Subject: Re: Dulicated messages in kafka sink topic using flink 
> cancel-with-savepoint operation
>  
> Hi Nastaran,
> 
> When you are checking for duplicated messages, are you reading from kafka 
> using `read_commited` mode (this is not the default value)?
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme>
> 
> > Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once 
> > semantic. Whenever you write to Kafka using
> > transactions, do not forget about setting desired isolation.level 
> > (read_committed or read_uncommitted - the latter one is the
> > default value) for any application consuming records from Kafka.
> 
> Does the problem happens always?
> 
> Piotrek
> 
>> On 28 Nov 2018, at 08:56, Nastaran Motavali > <mailto:n.motav...@son.ir>> wrote:
>> 
>> Hi,
>> I have a flink streaming job implemented via java which reads some messages 
>> from a kafka topic, transforms them and finally sends them to another kafka 
>> topic.
>> The version of flink is 1.6.2 and the kafka version is 011. I pass the 
>> Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I 
>> cancel the job with savepoint and then restart it using the saved savepoint, 
>> I have duplicated messages in the sink.
>> Do I miss some kafka/flink configurations to avoid duplication?
>> 
>> 
>> Kind regards,
>> Nastaran Motavalli



Re: runtime.resourcemanager

2018-12-07 Thread Piotr Nowojski
Hi,

Please investigate logs/standard output/error from the task manager that has 
failed (the logs that you showed are from job manager). Probably there is some 
obvious error/exception explaining why has it failed. Most common reasons:
- out of memory
- long GC pause
- seg fault or other error from some native library
- task manager killed via for example SIGKILL

Piotrek

> On 6 Dec 2018, at 17:34, Alieh  wrote:
> 
> Hello all,
> 
> I have an algorithm x () which contains several joins and usage of 3 times of 
> gelly ConnectedComponents. The problem is that if I call x() inside a script 
> more than three times, I receive the messages listed below in the log and the 
> program is somehow stopped. It happens even if I run it with a toy example of 
> a graph with less that 10 vertices. Do you have any clue what is the problem?
> 
> Cheers,
> 
> Alieh
> 
> 
> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger 
> heartbeat request.
> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger 
> heartbeat request.
> 129150 [flink-akka.actor.default-dispatcher-20] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received heartbeat 
> request from e80ec35f3d0a04a68000ecbdc555f98b.
> 129150 [flink-akka.actor.default-dispatcher-22] DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received 
> heartbeat from 78cdd7a4-0c00-4912-992f-a2990a5d46db.
> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received 
> new slot report from TaskManager 78cdd7a4-0c00-4912-992f-a2990a5d46db.
> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received 
> slot report from instance 4c3e3654c11b09fbbf8e993a08a4c2da.
> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Release 
> TaskExecutor 4c3e3654c11b09fbbf8e993a08a4c2da because it exceeded the idle 
> timeout.
> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Worker 
> 78cdd7a4-0c00-4912-992f-a2990a5d46db could not be stopped.
> 



Re: A question on the Flink "rolling" FoldFunction

2018-12-07 Thread Piotr Nowojski
Hi Min,

Please feel welcomed in the Flink community. One small remark, dev mailing list 
is for developers of Flink and all of the issues/discussions that arise in the 
process (discussing how to implement new feature etc), so user mailing list is 
the right one to ask questions about using Flink :)

Asking your question, Flink doesn’t guarantee order of the elements and 
especially not when it involves multiple parallel operators. The order can 
brake whenever there is some network exchange/transfer between operators/tasks, 
for example after using `keyBy`, `rebalance` or on the border of changing 
parallelism. Locally, within one instance of your Fold function, order is 
preserved. In other words, if you need some elements order to process records, 
you have to make sure that it’s there, for example:


env.addSrouce(….)
.keyBy(….)
.process(new MyElementsSorter())
.fold(…)

However keep in mind that sorting records in an infinite data stream is a quite 
tricky concept unless it’s only a “best effort". Usually you would probably 
like to group records into a window and process those windows.

Piotrek

> On 7 Dec 2018, at 09:12, min@ubs.com wrote:
> 
> Hi,
>  
> I am new to Flink. I have a question on this "rolling" fold function.
>  
> If its parallelism is large than one, does the "rolling" order remains the 
> same? i.e. it is always keep the "1-2-3-4-5" on an increasing sequence.
>  
> Regards,
>  
> Min
>  
> --- 
> FoldFunction 
> ---
> A "rolling" fold on a keyed data stream with an initial value. Combines the 
> current element with the last folded value and emits the new value. 
> A fold function that, when applied on the sequence (1,2,3,4,5), emits the 
> sequence "start-1", "start-1-2", "start-1-2-3", ...
> DataStream result =
>   keyedStream.fold("start", new FoldFunction() {
> @Override
> public String fold(String current, Integer value) {
> return current + "-" + value;
> }
>   });
> ---
>  
> From: jincheng sun [mailto:sunjincheng...@gmail.com 
> ] 
> Sent: Freitag, 7. Dezember 2018 02:24
> To: rakkukumar2...@gmail.com 
> Cc: user@flink.apache.org ; 
> d...@flink.apache.org 
> Subject: [External] Re: delay one of the datastream when performing join 
> operation on event-time and watermark
>  
> Hi Pakesh Kuma,
> I think you can using the interval-join, e.g.:
> orderStream
> .keyBy()
> .intervalJoin(invoiceStream.keyBy())
> .between(Time.minutes(-5), Time.minutes(5))
> The semantics of interval-join and detailed usage description can refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join
>  
> 
>  
> Hope to help you, and any feedback is welcome!
>  
> Bests,
> Jincheng
>  
>  
> Rakesh Kumar mailto:rakkukumar2...@gmail.com>> 
> 于2018年12月6日周四 下午7:10写道:
> Hi,
> I have two data sources one is  for order data and another one is for invoice 
> data, these two data i am pushing into kafka topic in json form. I wanted to 
> delay order data for 5 mins because invoice data comes only after order data 
> is generated. So, for that i have written a flink program which will take 
> these two data from kafka and apply watermarks and delay order data for 5 
> mins. After applying watermarks on these data, i wanted to join these data 
> based on order_id which is present in both order and invoice data. After 
> Joining i wanted to push it to kafka in different topic.
>  
> But, i am not able to join these data streams with 5 min delay and i am not 
> able to figure it out.
>  
> I am attaching my flink program below and it's dependency.
> 
> Check out our new brand campaign: www.ubs.com/together 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and m

Re: Use event time

2018-12-07 Thread Piotr Nowojski
Hi again!

Flink doesn’t order/sort the records according to event time. The preveiling 
idea is:
- records will be arriving out of order, operators should handle that
- watermarks are used for indicators of the current lower bound of the event 
time “clock”

For examples windowed joins/aggregations  assign records to one or more time 
windows, collect all of the data belonging to a window and when watermark 
exceeds/overtakes the window that when that window is being evaluated.

Piotrek 

> On 7 Dec 2018, at 09:22, min@ubs.com wrote:
> 
> Hi,
>  
> I am new to Flink. 
>  
> I have the following small code to use the event time. I did not get the 
> result expected, i.e. it print out events in the order of event time.
>  
> Did I miss something here?
>  
> Regards,
>  
> Min
>  
>  
> --Event time--
>public static void main(String[] args) throws Exception  {
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> long start =System.currentTimeMillis();
> DataStream stream = env.fromElements(new Event(0,start,start),
> new Event(1,start+10,start+10), new 
> Event(2,start+20,start-20),
> new Event(3,start+30,start-30), new 
> Event(4,start+40,start-40));
> 
> stream.map(event -> "RAW order " + event.toString()).print();
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time1; } })
> .map(event -> "time1 order:: " + event.toString()).print();
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time2; } })
> .map(event -> "time2 order:: " + event.toString()).print();
> 
> env.execute("event time ");
> }
> 
> 
> static public class Event {
> int id;
> long time1;
> long time2;
> 
> Event(int id, long time1, long time2){
> this . id  =id;
> this.time1=time1;
> this.time2=time2;
> }
> 
> public String toString() {
> return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
> }
> }
> }
> --
> 
> Check out our new brand campaign: www.ubs.com/together 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and monitor all messages. Messages are 
> protected and accessed only in legally justified cases.
> For information on how UBS uses and discloses personal data, how long we 
> retain it, how we keep it secure and your data protection rights, please see 
> our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html 
> 


Re: Flink with Docker: docker-compose and FLINK_JOB_ARGUMENT exception

2018-12-07 Thread Piotr Nowojski
Hi,

I have never used flink and docker together, so I’m not sure if I will be able 
to help, however have you seen this README:
https://github.com/apache/flink/tree/master/flink-container/docker
?
Shouldn’t you be passing your arguments via `FLINK_JOB_ARGUMENTS` environment 
variable? 

Piotrek

> On 7 Dec 2018, at 10:55, Marke Builder  wrote:
> 
> Hi,
> 
> I'm trying to run flink with docker (docker-compose) and job arguments 
> "config-dev.properties". But it seams that the job arguments are not 
> available:
> 
> docker-compose.yml
> 
> version: '2'
> services:
>   job-cluster:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> ports:
>   - '8081:8081'
> command: job-cluster --job-classname -Djobmanager.rpc.address=job-cluster 
> -Dparallelism.default=1 --config config-dev.properties
> 
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> command: task-manager -Djobmanager.rpc.address=job-cluster
> scale: 1
> 
> 
> Excpetion:
>  <>4:32 AMorg.apache.flink.runtime.entrypoint.FlinkParseException: Failed to 
> parse the command line arguments.
> 12/7/2018 10:44:32 AM  at 
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
> 12/7/2018 10:44:32 AM  at 
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
> 12/7/2018 10:44:32 AMCaused by: 
> org.apache.commons.cli.MissingArgumentException: Missing argument for option: 
> j
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.checkRequiredArgs(DefaultParser.java:211)
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.handleOption(DefaultParser.java:599)
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.handleShortAndLongOption(DefaultParser.java:548)
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.handleToken(DefaultParser.java:243)
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:120)
> 12/7/2018 10:44:32 AM  at 
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
> 12/7/2018 10:44:32 AM  at 
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
> 12/7/2018 10:44:32 AM  ... 1 more
> 12/7/2018 10:44:32 AMException in thread "main" java.lang.NoSuchMethodError: 
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.printHelp()V
> 12/7/2018 10:44:32 AM  at 
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:146)



Re: Use event time

2018-12-07 Thread Piotr Nowojski
You are welcome :)

More or less you are correct. Assigning event time doesn’t reorder anything in 
the stream, that’s just a meta information about a record that can be used by 
various functions/operators, not only by windowed operations. As I answered in 
“A question on the Flink "rolling" FoldFunction” topic, one could use event 
time for example for a "best effort" sorting mechanism (which buffers some 
number of elements and tries to smooth the stream out/best effort sort it), 
registering event time timers or for any other custom operation. 

Same applies to watermarks. They are also another kind of “meta data” being 
passed along that can be used and is for example be the before mentioned 
windowed operators to emit the data/clean up the state.

Piotrek 

> On 7 Dec 2018, at 11:19,   wrote:
> 
> Many thanks for sending your email.
>  
> Does this mean that the event time only impacts on the event selection for a 
> time window?
>  
> Without use of a time window, the event time has no impact on the order of 
> any records/events?
>  
> Is my understanding correct?
>  
> Thank you very much for your help.
>  
> Regards,
>  
> Min
>  
>  
>  
> From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
> Sent: Freitag, 7. Dezember 2018 11:11
> To: Tan, Min
> Cc: user
> Subject: [External] Re: Use event time
>  
> Hi again!
>  
> Flink doesn’t order/sort the records according to event time. The preveiling 
> idea is:
> - records will be arriving out of order, operators should handle that
> - watermarks are used for indicators of the current lower bound of the event 
> time “clock”
>  
> For examples windowed joins/aggregations  assign records to one or more time 
> windows, collect all of the data belonging to a window and when watermark 
> exceeds/overtakes the window that when that window is being evaluated.
>  
> Piotrek 
> 
> 
> On 7 Dec 2018, at 09:22, min@ubs.com <mailto:min@ubs.com> wrote:
>  
> Hi,
>  
> I am new to Flink. 
>  
> I have the following small code to use the event time. I did not get the 
> result expected, i.e. it print out events in the order of event time.
>  
> Did I miss something here?
>  
> Regards,
>  
> Min
>  
>  
> --Event time--
>public static void main(String[] args) throws Exception  {
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> long start =System.currentTimeMillis();
> DataStream stream = env.fromElements(new Event(0,start,start),
> new Event(1,start+10,start+10), new 
> Event(2,start+20,start-20),
> new Event(3,start+30,start-30), new 
> Event(4,start+40,start-40));
> 
> stream.map(event -> "RAW order " + event.toString()).print();
> 
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time1; } })
> .map(event -> "time1 order:: " + event.toString()).print();
> 
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time2; } })
> .map(event -> "time2 order:: " + event.toString()).print();
> 
> env.execute("event time ");
> }
> 
> 
> static public class Event {
> int id;
> long time1;
> long time2;
> 
> Event(int id, long time1, long time2){
> this <http://this.id/>. <http://this.id/>id <http://this.id/> =id;
> this.time1=time1;
> this.time2=time2;
> }
> 
> public String toString() {
> return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
> }
> }
> }
> --
> 
> Check out our new brand campaign: www.ubs.com/together 
> <http://www.ubs.com/together>
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the ri

Re: Failed to resume job from checkpoint

2018-12-07 Thread Piotr Nowojski
Hey,

Do you mean that the problem started occurring only after upgrading to Flink 
1.7.0?

Piotrek

> On 7 Dec 2018, at 11:28, Ben Yan  wrote:
> 
> hi . I am using flink-1.7.0. I am using RockDB and hdfs as statebackend, but 
> recently I found the following exception when the job resumed from the 
> checkpoint. Task-local state is always considered a secondary copy, the 
> ground truth of the checkpoint state is the primary copy in the distributed 
> store. But it seems that the job did not recover from hdfs, and it failed 
> directly.Hope someone can give me advices or hints about the problem that I 
> encountered.
> 
> 
> 2018-12-06 22:54:04,171 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess 
> (3/138) (5d96a585130f7a21f22f82f79941fb1d) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5_(3/138) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>   ... 5 more
> Caused by: java.nio.file.NoSuchFileException: 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/db/000495.sst
>  -> 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/5683a26f-cde2-406d-b4cf-3c6c3976f8ba/000495.sst
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>   at java.nio.file.Files.createLink(Files.java:1086)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBKeyedStateBackend.java:1238)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreLocalStateIntoFullInstance(RocksDBKeyedStateBackend.java:1186)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBKeyedStateBackend.java:916)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:864)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:525)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:147)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> 
> Best
> Ben



Re: Failed to resume job from checkpoint

2018-12-07 Thread Piotr Nowojski
Adding back user mailing list.

Andrey, could you take a look at this?

Piotrek

> On 7 Dec 2018, at 12:28, Ben Yan  wrote:
> 
> Yes. Previous versions never happened
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 
> 于2018年12月7日周五 下午7:27写道:
> Hey,
> 
> Do you mean that the problem started occurring only after upgrading to Flink 
> 1.7.0?
> 
> Piotrek
> 
>> On 7 Dec 2018, at 11:28, Ben Yan > <mailto:yan.xiao.bin.m...@gmail.com>> wrote:
>> 
>> hi . I am using flink-1.7.0. I am using RockDB and hdfs as statebackend, but 
>> recently I found the following exception when the job resumed from the 
>> checkpoint. Task-local state is always considered a secondary copy, the 
>> ground truth of the checkpoint state is the primary copy in the distributed 
>> store. But it seems that the job did not recover from hdfs, and it failed 
>> directly.Hope someone can give me advices or hints about the problem that I 
>> encountered.
>> 
>> 
>> 2018-12-06 22:54:04,171 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess 
>> (3/138) (5d96a585130f7a21f22f82f79941fb1d) switched from RUNNING to FAILED.
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
>> state backend for 
>> KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5_(3/138) from any of 
>> the 1 provided restore options.
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>  ... 5 more
>> Caused by: java.nio.file.NoSuchFileException: 
>> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/db/000495.sst
>>  -> 
>> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/5683a26f-cde2-406d-b4cf-3c6c3976f8ba/000495.sst
>>  at 
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>  at 
>> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>>  at java.nio.file.Files.createLink(Files.java:1086)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBKeyedStateBackend.java:1238)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreLocalStateIntoFullInstance(RocksDBKeyedStateBackend.java:1186)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBKeyedStateBackend.java:916)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:864)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:525)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:147)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>  ... 7 more
>> 
>> Best
>> Ben
> 



Re: runtime.resourcemanager

2018-12-10 Thread Piotr Nowojski
Hi,

Have you checked task managers logs?

Piotrek

> On 8 Dec 2018, at 12:23, Alieh  wrote:
> 
> Hello Piotrek,
> 
> thank you for your answer. I installed a Flink on a local cluster and used 
> the GUI in order to monitor the task managers. It seems the program does not 
> start at all. The whole time just the job manager is struggling... For very 
> very toy examples, after a long time (during this time I see the job manager 
> logs as I mentioned before),  the job is started and can be executed in 2 
> seconds.  
> 
> Best,
> 
> Alieh
> 
> 
> On 12/07/2018 10:43 AM, Piotr Nowojski wrote:
>> Hi,
>> 
>> Please investigate logs/standard output/error from the task manager that has 
>> failed (the logs that you showed are from job manager). Probably there is 
>> some obvious error/exception explaining why has it failed. Most common 
>> reasons:
>> - out of memory
>> - long GC pause
>> - seg fault or other error from some native library
>> - task manager killed via for example SIGKILL
>> 
>> Piotrek
>> 
>>> On 6 Dec 2018, at 17:34, Alieh  
>>> <mailto:sae...@informatik.uni-leipzig.de> wrote:
>>> 
>>> Hello all,
>>> 
>>> I have an algorithm x () which contains several joins and usage of 3 times 
>>> of gelly ConnectedComponents. The problem is that if I call x() inside a 
>>> script more than three times, I receive the messages listed below in the 
>>> log and the program is somehow stopped. It happens even if I run it with a 
>>> toy example of a graph with less that 10 vertices. Do you have any clue 
>>> what is the problem?
>>> 
>>> Cheers,
>>> 
>>> Alieh
>>> 
>>> 
>>> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Trigger heartbeat request.
>>> 129149 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Trigger heartbeat request.
>>> 129150 [flink-akka.actor.default-dispatcher-20] DEBUG 
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received heartbeat 
>>> request from e80ec35f3d0a04a68000ecbdc555f98b.
>>> 129150 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Received heartbeat from 78cdd7a4-0c00-4912-992f-a2990a5d46db.
>>> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
>>> Received new slot report from TaskManager 
>>> 78cdd7a4-0c00-4912-992f-a2990a5d46db.
>>> 129151 [flink-akka.actor.default-dispatcher-22] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Received 
>>> slot report from instance 4c3e3654c11b09fbbf8e993a08a4c2da.
>>> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Release 
>>> TaskExecutor 4c3e3654c11b09fbbf8e993a08a4c2da because it exceeded the idle 
>>> timeout.
>>> 129200 [flink-akka.actor.default-dispatcher-15] DEBUG 
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Worker 
>>> 78cdd7a4-0c00-4912-992f-a2990a5d46db could not be stopped.
>>> 
>> 
> 



Re: runtime.resourcemanager

2018-12-11 Thread Piotr Nowojski
the 
> flink-queryable-state-runtime jar from the opt to the lib folder.
> 2018-12-10 12:20:22,257 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
> network environment and its components.
> 2018-12-10 12:20:22,289 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
> initialization (took 31 ms).
> 2018-12-10 12:20:22,325 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
> initialization (took 35 ms). Listening on SocketAddress /127.0.1.1:46127.
> 2018-12-10 12:20:22,326 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
> managed memory to 0.7 of the currently free heap space (640 MB), memory will 
> be allocated lazily.
> 2018-12-10 12:20:22,329 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> uses directory /tmp/flink-io-4f10dc60-3805-4c50-85a1-497c99dfb20c for spill 
> files.
> 2018-12-10 12:20:22,387 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages 
> have a max timeout of 1 ms
> 2018-12-10 12:20:22,394 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC 
> endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at 
> akka://flink/user/taskmanager_0 .
> 2018-12-10 12:20:22,406 INFO  
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job 
> leader service.
> 2018-12-10 12:20:22,407 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
> ResourceManager 
> akka.tcp://flink@localhost:6123/user/resourcemanager().
> 2018-12-10 12:20:22,409 INFO  org.apache.flink.runtime.filecache.FileCache
>   - User file cache uses directory 
> /tmp/flink-dist-cache-058052c5-36cc-432f-88eb-8acf7dc5f1f1
> 2018-12-10 12:20:22,743 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
> ResourceManager address, beginning registration
> 2018-12-10 12:20:22,743 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration 
> at ResourceManager attempt 1 (timeout=100ms)
> 2018-12-10 12:20:22,814 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
> registration at resource manager 
> akka.tcp://flink@localhost:6123/user/resourcemanager under registration id 
> ba9dd638db7ebccde63a3e0df420a990.
> 
> On 12/10/2018 12:14 PM, Piotr Nowojski wrote:
>> Hi,
>> 
>> Have you checked task managers logs?
>> 
>> Piotrek
>> 
>>> On 8 Dec 2018, at 12:23, Alieh >> <mailto:sae...@informatik.uni-leipzig.de>> wrote:
>>> 
>>> Hello Piotrek,
>>> 
>>> thank you for your answer. I installed a Flink on a local cluster and used 
>>> the GUI in order to monitor the task managers. It seems the program does 
>>> not start at all. The whole time just the job manager is struggling... For 
>>> very very toy examples, after a long time (during this time I see the job 
>>> manager logs as I mentioned before),  the job is started and can be 
>>> executed in 2 seconds.  
>>> 
>>> Best,
>>> 
>>> Alieh
>>> 
>>> 
>>> On 12/07/2018 10:43 AM, Piotr Nowojski wrote:
>>>> Hi,
>>>> 
>>>> Please investigate logs/standard output/error from the task manager that 
>>>> has failed (the logs that you showed are from job manager). Probably there 
>>>> is some obvious error/exception explaining why has it failed. Most common 
>>>> reasons:
>>>> - out of memory
>>>> - long GC pause
>>>> - seg fault or other error from some native library
>>>> - task manager killed via for example SIGKILL
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 6 Dec 2018, at 17:34, Alieh  
>>>>> <mailto:sae...@informatik.uni-leipzig.de> wrote:
>>>>> 
>>>>> Hello all,
>>>>> 
>>>>> I have an algorithm x () which contains several joins and usage of 3 
>>>>> times of gelly ConnectedComponents. The problem is that if I call x() 
>>>>> inside a script more than three times, I receive the messages listed 
>>>>> below in the log and the program is somehow stopped. It happens even if I 
>>>>> run it with a toy example of a graph with less that 10 vertices. Do you 
>>>>> have any clue what is the problem?
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Alieh
>>>>> 
>>&g

Re: How to migrate Kafka Producer ?

2019-01-02 Thread Piotr Nowojski
Hi Edward,

Sorry for coming back so late (because of holiday season).

You are unfortunately right. Our FlinkKafkaProducer should have been 
upgrade-able, but it is not. I have created a bug for this [1]. For the time 
being, until we fix the issue, you should be able to stick to 0.11 producer 
without noticeable negative effects. Our FlinkKafkaProducer011 has the same 
forward & backward compatibility as the universal FlinkKakfaProducer (The 
biggest change between two of them was just changing the naming convention), so 
you can use either of them with the same versions of Kafka brokers (0.10+).

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11249

> On 18 Dec 2018, at 14:33, Edward Rojas  wrote:
> 
> Hi,
> 
> I'm planning to migrate from kafka connector 0.11 to the new universal kafka
> connector 1.0.0+ but I'm having some troubles.
> 
> The kafka consumer seems to be compatible but when trying to migrate the
> kafka producer I get an incompatibility error for the state migration. 
> It looks like the producer uses a list state of type
> "NextTransactionalIdHint", but this class is specific for each Producer
> (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
> compatible.
> 
> 
> I would like to know what is the recommended way to perform this kind of
> migration without losing the state ?
> 
> Thanks in advance,
> Edward
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: NPE when using spring bean in custom input format

2019-01-21 Thread Piotr Nowojski
Hi,

You have to use `open()` method to handle initialisation of the things required 
by your code/operators. By the nature of the LocalEnvironment, the life cycle 
of the operators is different there compared to what happens when submitting a 
job to the real cluster. With remote environments your classes will be 
serialised, sent over the network and then deserialised and then `open()` 
methods will be called. In such setup, if you need to initialise some shared 
static resource, you also have to keep in mind that depending on the 
parallelism, number of the tasks & number of task managers, you will have to 
make sure that your static resource is initialised only once. You also should 
take care about de-initialisation of this resource & take into account what 
will happen if your code will crash with an exception. In that case your job 
might be resubmitted with out restarting the TaskManagers.

Piotrek

> On 18 Jan 2019, at 11:48, madan  wrote:
> 
> Suggestions please.
> 
> Thinking of options
> 1. Initilizing spring application context in the 'open' method. Instead of 
> loading entire context, move service related beans to one/multiple packages  
> and scan only those packages. Requires code refactoring.
> 2. Direct database query - direct query cannot be used since business logic 
> is around while fetching records
> 3. Write initially to csv and do transformation on csv. Last possible option.
> 
> Please share your thoughts.
> 
> Thank you.
> 
> On Wed, Jan 16, 2019 at 2:50 PM madan  > wrote:
> Hi,
> 
> Need help in the below scenario,
> 
> I have CustomInputFormat which loads the records using a bean,
> 
> public class CustomInputFormat extends GenericInputFormat {
>   private Iterator> recordsIterator;
>   @Override
> public void open(GenericInputSplit split) throws IOException {
>ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
> recordsIterator = serviceX.getRecords(..); 
>  }
> }
> The above input format works fine when using Flink LocalEnvironment in spring 
> application. Problem is when running flink in a cluster mode and trying to 
> connect to it using RemoveEnvironment. Since Spring applicaiton context will 
> not be initialized, NPE is thrown. Please suggest what could be the solution 
> in this scenario.
> 
> 
> 
> -- 
> Thank you,
> Madan.
> 
> 
> -- 
> Thank you,
> Madan.



Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction 
streams as the input (or conversions from append stream to retraction stream) 
at the moment. With that your problem would simplify to one simple `SELECT uid, 
count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related 
work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need 
to write your own custom `LAST_ROW(x)` aggregation function, which would just 
return the value of the most recent aggregated row. With that you could write a 
query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
* 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream 
application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
> 
> Hi,
> I have a requirement and need to understand if same can be achieved with 
> Flink retract stream. Let's say we have stream with 4 attributes userId, 
> orderId, status, event_time where orderId is unique and hence any change in 
> same orderId updates previous value as below
> 
> Changelog Event Stream
> 
> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6
> 
> Snapshot view at time t6 (as viewed in mysql)
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for 
> respective order ids)
> 
> What I need is to maintain count of "Pending" orders against a user and if 
> they go beyond configured threshold, then push that user and pending count to 
> Kafka. Here there can be multiple updates to order status e.g Pending -> 
> Success or Pending -> Failed. Also in some cases there may not be any change 
> in status but we may still get a row (may be due to some other attribute 
> update which we are not concerned about). So is it possible to have running 
> count in flink as below at respective event times. Here Pending count is 
> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
> changed from Pending to Success. Similarly for user u2, at time t6, there was 
> no change in running count as there was no change in status for order o4
> 
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
> decreased for u1)
> t5 -> u1 : 1, u2 : 1
> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
> 
> As I understand may be retract stream can achieve this. However I am not sure 
> how. Any samples around this would be of great help.
> 
> Gagan
> 



Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
@Jeff: It depends if user can define a time window for his condition. As Gagan 
described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom 
code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using 
`CASE` expression. 

One thing to note here is that probably all of the proposed solutions would 
work based on the order of the records, not based on the event_time.

Piotrek

> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
> 
> I am thinking of another approach instead of retract stream. Is it possible 
> to define a custom window to do this ? This window is defined for each order. 
> And then you just need to analyze the events in this window.
> 
> Piotr Nowojski mailto:pi...@da-platform.com>> 
> 于2019年1月21日周一 下午8:44写道:
> Hi,
> 
> There is a missing feature in Flink Table API/SQL of supporting retraction 
> streams as the input (or conversions from append stream to retraction stream) 
> at the moment. With that your problem would simplify to one simple `SELECT 
> uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with 
> related work [1], so this might be supported in the next couple of months.
> 
> There might a workaround at the moment that could work. I think you would 
> need to write your own custom `LAST_ROW(x)` aggregation function, which would 
> just return the value of the most recent aggregated row. With that you could 
> write a query like this:
> 
> SELECT 
>   uid, count(*) 
> FROM (
>   SELECT 
>   * 
>   FROM (
>   SELECT 
>   uid, LAST_ROW(status)
>   FROM
>   changelog
>   GROUP BY
>   uid, oid)
>   WHERE status = `pending`)
> GROUP BY
>   uid
> 
> Where `changelog` is an append only stream with the following content:
> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
> 
> 
> Besides that, you could also write your own a relatively simple Data Stream 
> application to do the same thing.
> 
> I’m CC’ing Timo, maybe he will have another better idea.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8577 
> <https://issues.apache.org/jira/browse/FLINK-8577>
> 
>> On 18 Jan 2019, at 18:30, Gagan Agrawal > <mailto:agrawalga...@gmail.com>> wrote:
>> 
>> Hi,
>> I have a requirement and need to understand if same can be achieved with 
>> Flink retract stream. Let's say we have stream with 4 attributes userId, 
>> orderId, status, event_time where orderId is unique and hence any change in 
>> same orderId updates previous value as below
>> 
>> Changelog Event Stream
>> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>> 
>> Snapshot view at time t6 (as viewed in mysql)
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for 
>> respective order ids)
>> 
>> What I need is to maintain count of "Pending" orders against a user and if 
>> they go beyond configured threshold, then push that user and pending count 
>> to Kafka. Here there can be multiple updates to order status e.g Pending -> 
>> Success or Pending -> Failed. Also in some cases there may not be any change 
>> in status but we may still get a row (may be due to some other attribute 
>> update which we are not concerned about). So is it possible to have running 
>> count in flink as below at respective event times. Here Pending count is 
>> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
>> changed from Pending to Success. Similarly for user u2, at time t6, there 
>> was no change in running count as there was no change in status for order o4
>> 
>> t1 -> u1 : 1, u2 : 0
>> t2 -> u1 : 1, u2 : 0
>> t3 -> u1 : 2, u2 : 0
>> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
>> decreased for u1)
>> t5 -> u1 : 1, u2 : 1
>> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
>> 
>> As I understand may be retract stream can achieve this. However I am not 
>> sure how. Any samples around this would be of great help.
>> 
>> Gagan
>> 
> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: Sampling rate higher than 1Khz

2019-01-28 Thread Piotr Nowojski
Hi,

Maybe stupid idea, but does anything prevents a user from pretending that 
watermarks/event times are in different unit, for example microseconds? Of 
course assuming using row/event time and not using processing time for anything?

Piotrek 

> On 28 Jan 2019, at 14:58, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi!
> 
> Yes, Flink's watermark timestamps are in milliseconds, which means that 
> time-based operators such as time window operators will be fired at a 
> per-millisecond granularity.
> Whether or not this introduces "latency" in the pipeline depends on the 
> granularity of your time window operations; if you need to have window 
> durations shorter than 1 millisecond, then yes, having only millisecond 
> watermarks will introduce latency.
> Currently in Flink, time-based operations such as windows / registering 
> timers are all done at millisecond accuracy.
> 
> Cheers,
> Gordon
> 
> On Mon, Jan 28, 2019 at 7:55 PM Nicholas Walton  > wrote:
> Flinks watermarks are in milliseconds. I have time sampled off a sensor at a 
> rate exceeding 1Khz or 1 per millisecond. Is there a way to handle timestamp 
> granularity below milliseconds, or will I have to generate timestamp for the 
> millisecond value preceding that associated with the sensor reading, which 
> IUC will introduce latency into the processing pipeline. 
> 
> TIA



Re: Flink Custom SourceFunction and SinkFunction

2019-03-04 Thread Piotr Nowojski
Hi,

I couldn’t find any references to your question neither I haven’t seen such use 
case, but:

Re 1. 
It looks like it could work

Re 2.
It should work as well, but just try to use StreamingFileSink

Re 3.
For custom source/sink function, if you do not care data processing guarantees 
it’s quite easy. If you have to achieve at-least-once or exactly-once things 
might get more complicated. 
For exactly-once sink, you should start from `TwoPhaseCommitSinkFunction`. 
(Example usages check test class 
`TwoPhaseCommitSinkFunctionTest.ContentDumpSinkFunction`, or more complicated 
FlinkKafkaProducer)
For at-least-once sink, you can just flush/sync the output files on 
snapshot/checkpoint.
For source, you would have to manually keep the input offsets on Flink’s state. 

Re 4.

Regarding SFTP support: not that I’m aware of.
Regarding sources/sinks you can try to look at existing source/sinks 
implementations.

Piotrek

> On 1 Mar 2019, at 09:39, Siew Wai Yow  wrote:
> 
> Hi guys,
> 
> I have question regarding to the title that need your expertise,
> 
> I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem 
> suitable?
> I need to build a SFTP SinkFunction as well, may I know if per-defined HDFS 
> rolling file sink accept SFTP connection since SFTP is supported by hadoop 
> file system?
> Any good reference on how to write custom source/sink?
> Any similar code to share?
> Thanks!
> 
> Regards,
> Yow



Re: Setting source vs sink vs window parallelism with data increase

2019-03-04 Thread Piotr Nowojski
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this 
may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there 
are long GC pause, this should be the first thing to fix.

Piotrek

> On 2 Mar 2019, at 08:19, Padarn Wilson  wrote:
> 
> Hi all again - following up on this I think I've identified my problem as 
> being something else, but would appreciate if anyone can offer advice.
> 
> After running my stream from sometime, I see that my garbage collector for 
> old generation starts to take a very long time:
> 
> here the purple line is young generation time, this is ever increasing, but 
> grows slowly, while the blue is old generation.
> This in itself is not a problem, but as soon as the next checkpoint is 
> triggered after this happens you see the following:
> 
> It looks like the checkpoint hits a cap, but this is only because the 
> checkpoints start to timeout and fail (these are the alignment time per 
> operator)
> 
> I do notice that my state is growing quite larger over time, but I don't have 
> a good understanding of what would cause this to happen with the JVM old 
> generation metric, which appears to be the leading metric before a problem is 
> noticed. Other metrics such as network buffers also show that at the 
> checkpoint time things start to go haywire and the situation never recovers.
> 
> Thanks
> 
> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson  > wrote:
> Hi all,
> 
> I'm trying to process many records, and I have an expensive operation I'm 
> trying to optimize. Simplified it is something like:
> 
> Data: (key1, count, time)
> 
> Source -> Map(x -> (x, newKeyList(x.key1))
> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
> -> Keyby(_.key1).TublingWindow().apply..
> -> Sink
> 
> In the Map -> Flatmap, what is happening is that each key is mapping to a set 
> of keys, and then this is set as the new key. This effectively increase the 
> size of the stream by 16x
> 
> What I am trying to figure out is how to set the parallelism of my operators. 
> I see in some comments that people suggest your source, sink and aggregation 
> should have different parallelism, but I'm not clear on exactly why, or what 
> this means for CPU utilization. 
> (see for example 
> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>  
> )
> 
> Also, it isn't clear to me the best way to handle this increase in data 
> within the stream itself.
> 
> Thanks



Re: Task slot sharing: force reallocation

2019-03-04 Thread Piotr Nowojski
Hi,

Are you asking the question if that’s the behaviour or you have actually 
observed this issue? I’m not entirely sure, but I would guess that the Sink 
tasks would be distributed randomly across the cluster, but maybe I’m mixing 
this issue with resource allocations for Task Managers. Maybe Till will know 
something more about this?

One thing that might have solve/workaround the issue is to run those jobs in 
the job mode (one cluster per job), not in cluster mode, since containers for 
Task Managers are created/requested randomly.

Piotrek

> On 2 Mar 2019, at 23:53, Le Xu  wrote:
> 
> Hello!
> 
> I'm trying to find out if there a way to force task slot sharing within a 
> job. The example on the website looks like the following (as in the 
> screenshot)
> 
> 
> In this example, the single sink is slot-sharing with source/map (1) and 
> window operator (1). If I deploy multiple identical jobs shown above, all 
> sink operators would be placed on the first machine (which creates an 
> unbalanced scenario). Is there a way to avoid this situation (i.e., to have 
> sink operators of different jobs spread evenly across the task slots for the 
> entire cluster). Specifically, I was wondering if either of the following 
> options are possible:
> 1. To force Sink[1] to be slot sharing with mapper from a different partition 
> on other slots such as (source[2] and window[2]).
> 2. If option 1 is not possible, is there a "hacky" way for Flink to deploy 
> jobs starting from a different machine: e.g. For job 2, it can allocate 
> source/map[1], window[1], sink[1] to machine 2 instead of again on machine 1. 
> In this way the slot-sharing groups are still the same, but we end up having 
> sinks from the two jobs on different machines.
> 
> 
> Thanks!
> 
> 
> 
> 



Re: Command exited with status 1 in running Flink on marathon

2019-03-04 Thread Piotr Nowojski
Hi,

With just this information it might be difficult to help.

Please look for some additional logs (has the Flink managed to log anything?) 
or some standard output/errors. I would guess this might be some relatively 
simple mistake in configuration, like file/directory read/write/execute 
permissions or something like that.

I guess you have seen/followed this?
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html

Piotrek

> On 3 Mar 2019, at 12:46, Mar_zieh  wrote:
> 
> I want to run my flink program on Mesos cluster via marathon. I created an
> application with this Json file in Marathon: 
> 
> { 
>"id": "flink", 
>"cmd": "/home/flink-1.7.0/bin/mesos-appmaster.sh
> -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081
> -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024
> -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2
> -Dmesos.resourcemanager.tasks.cpus=1", 
>"cpus": 1.0, 
>"mem": 1024 
> } 
> 
>  The task became failed with this error: 
> 
> I0303 09:41:52.841243  2594 exec.cpp:162] Version: 1.7.0 
> I0303 09:41:52.851898  2593 exec.cpp:236] Executor registered on agent
> d9a98175-b93c-4600-a41b-fe91fae5486a-S0 
> I0303 09:41:52.854436  2594 executor.cpp:182] Received SUBSCRIBED event 
> I0303 09:41:52.855284  2594 executor.cpp:186] Subscribed executor on
> 172.28.10.136 
> I0303 09:41:52.855479  2594 executor.cpp:182] Received LAUNCH event 
> I0303 09:41:52.855932  2594 executor.cpp:679] Starting task
> .933fdd2f-3d98-11e9-bbc4-0242a78449af 
> I0303 09:41:52.868172  2594 executor.cpp:499] Running
> '/home/mesos-1.7.0/build/src/mesos-containerizer launch
> ' 
> I0303 09:41:52.872699  2594 executor.cpp:693] Forked command at 2599 
> I0303 09:41:54.050284  2596 executor.cpp:994] Command exited with status 1
> (pid: 2599) 
> I0303 09:41:55.052323  2598 process.cpp:926] Stopped the socket accept loop 
> 
> I configured Zookeeper, Mesos, Marathon and Flink. Moreover, they are all on
> docker. I ran a simple program like "echo "hello" >> /home/output.txt"
> without any problems. 
> 
> I really do not know what is going on, I am confused. Would you please any
> one tell me what is wrong here? 
> 
> Any help would be appreciated. 
> 
> Many thanks.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs

2019-03-04 Thread Piotr Nowojski
Hi,

I’m not if I understand your question/concerns.

As Rong Rong explained, key selector is used to assign records to window 
operators. 

Within key context, you do not have access to other keys/values in your 
operator/functions, so your reduce/process/… functions when processing key:1 
won’t be able to access/see keys 2, 3, 4, 6 or 10, even if they are on the same 
machine. If you want to process records together, they must be keyed by 
together appropriately. 

Piotrek

> On 4 Mar 2019, at 04:45, 刘 文  wrote:
> 
> 
> Sorry, I still don't understand. Can I ask for help again?
> 
> 
> For example, the degree of parallelism is 2, which will produce two Window 
> threads.
> ).setParallelism(2)
> ).These two windows are how to read their own partition data.
> ).input data
>   1 2 3 4 5 6 7 8 9 10
> ).source   ->  operator   ->   RecordWriter.emitcal  partition by key,
> --
> change [partition 0]
>
>
>   key:1partition:0
>   key:2partition:0
>   key:3partition:0
>   key:4partition:0
>   key:6partition:0
>   key:10   partition:0
>--
>change 1  [partition 1]
>   
>   key:5partition:1
>   key:7partition:1
>   key:8partition:1
>   key:9partition:1
> ).window 0 (1/2)
>How to Calculation current parition  ?
> How to get the data in the current partition  ?
>
> ).window 1 (2/2)  
>How to Calculation current parition  ? 
> How to get the data in the current partition ?
> 
> ---
> 
>> 在 2019年3月4日,上午4:19,Rong Rong > > 写道:
>> 
>> Hi
>> 
>> I am not sure if I understand your question correctly, so will try to 
>> explain the flow how elements gets into window operators.
>> 
>> Flink makes the partition assignment before invoking the operator to process 
>> element. For the word count example, WindowOperator is invoked by 
>> StreamInputProcessor[1] to "setKeyContextElement".
>> The actual key is then set by WindowOperator (inherently by 
>> AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].
>> 
>> So, by the time WindowOperator processes elements, the KeyedStateBackend was 
>> already set to the correct key.
>> 
>> Hope this answers your question.
>> 
>> --
>> Rong
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html
>>  
>> 
>> On Sun, Mar 3, 2019 at 5:15 AM 刘 文 > > wrote:
>> ). Environment Flink1.7.2 WordCount local, stream processing
>> ).source RecordWriter.emit(), for each element by key, divided into 
>> different partitions, the partition location of each element has been 
>> determined, the number of partitions is determined by 
>> DataStream.setParallelism(2)
>>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to 
>> different channels, it is to send data to the window corresponding to 
>> different partitions (data is sent one by one)
> 



Re: StochasticOutlierSelection

2019-03-04 Thread Piotr Nowojski

Hi,

I have never used this code, but ml library depends heavily on Scala, so I 
wouldn’t recommend using it with Java. 

However if you want to go this way (I’m not sure if that’s possible), you would 
have to pass the implicit parameters manually somehow (I don’t know how to do 
that from Java). In this case you can take a look at the method’s signature 
that parameters has a default value and the implicitly passed 
transformOperation comes from 
`StochasticOutlierSelection.transformLabeledVectors` or/and (?) 
`StochasticOutlierSelection.transformVectors`.

Piotrek

> On 2 Mar 2019, at 12:21, anissa moussaoui  
> wrote:
> 
> Hello,
> 
> I would like to use the StochasticOutlierSelection algorithm for anomaly 
> detection in a DataSet in java but the problem the doc is only in scala.
> 
> In java doing : 
> 
> StochasticOutlierSelection stochasticOutliers= new 
> StochasticOutlierSelection();
> 
> the "tranform" operation of this algorithm take three parameters : training 
> dataSet, ParameterMap and TransformOperationDataSet on java but in scala doc 
> take one parameters 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/ml/sos.html#parameters
>  
> .
> It's the same case for other algorithms like KNN un scala take only training 
> dataset un scala but un java takes three parameters !
> 
> My question is that i do not know what represent ParameterMap and 
> TransformOperationDataSet and how i initialize them and in an optimised way 
> fr my model?
> 
> Do you have dôme exemple for flink ml un java please ?
> 
> 
> Thank you in advance !
> Best,
> 
> Anissa MOUSSAOUI 
> 
> 
> 
>        
>    
> 
> 
>  Pensez à la planète, imprimer ce papier que si nécessaire 



<    1   2   3   4   5   6   7   >