Re: s3 statebackend user state size
Hi Ufuk, Yes, it does help with Rocksdb backend! After tune checkpoint frequency align with network throughput, task manager released and job get cancelled are gone. Chen > On May 10, 2016, at 10:33 AM, Ufuk Celebi wrote: > >> On Tue, May 10, 2016 at 5:07 PM, Chen Qin wrote: >> Future, to keep large key/value space, wiki point out using rocksdb as >> backend. My understanding is using rocksdb will write to local file systems >> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3 >> checkpoint state split yet? Or would implement kvstate interface makes flink >> take care of large state problem? > > Hey Chen, > > when you use RocksDB, you only need to explicitly configure the file > system checkpoint directory, for which you can use S3: > > new RocksDBStateBackend(new URI("s3://...")) > > The local disk path are configured via the general Flink temp > directory configuration (see taskmanager.tmp.dirs in > https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html, > default is /tmp). > > State is written to the local RocksDB instance and the RocksDB files > are copied to S3 on checkpoints. > > Does this help? > > – Ufuk
Re: HBase write problem
Do you have the hbase-site.xml available in the classpath? On 10 May 2016 23:10, "Palle" wrote: > HBase write problem > > Hi all. > > I have a problem writing to HBase. > > I am using a slightly modified example of this class to proof the concept: > > https://github.com/apache/flink/blob/master/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java > > However all the HBase-specific stuff is exactly the same as in the > HBaseWriteExample. > > The problem I see is that the job never completes (been running for more > than an hour now) and it is only 13 key/value pairs that is to be written > to HBase :-) > I have tested the map/reduce stuff works if I replace the HBase connection > stuff with just a write to a text file - works OK. I have also tested that > I can insert data in HBase from a similar Hadoop MapReduce job. > > Here is the part of the code where I guess the problem is: > > @Override > public Tuple2 map(Tuple2 t) throws > Exception { > LOG.info("Tuple2 map() called"); > reuse.f0 = new Text(t.f0); > Put put = new Put(t.f0.getBytes()); > put.add(MasterConstants.CF_SOME, MasterConstants.COUNT, > Bytes.toBytes(t.f1)); > reuse.f1 = put; > return reuse; > } > }).output(new HadoopOutputFormat(new > TableOutputFormat(), job)); > > env.execute("Flink HBase Event Count Hello World Test"); > > This code matches the code in the HBaseWriteExample.java I should think. > > The "Tuple2" log line I see exactly the 13 times I expect, and the last > log line I see is this: > 2016-05-10 21:48:42,715 INFO > org.apache.hadoop.hbase.mapreduce.TableOutputFormat - Created > table instance for event_type_count > > Any suggestions to what the problem could be? > > Thanks, > Palle >
HBase write problem
HBase write problem Hi all. I have a problem writing to HBase. I am using a slightly modified example of this class to proof the concept: https://github.com/apache/flink/blob/master/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java However all the HBase-specific stuff is exactly the same as in the HBaseWriteExample. The problem I see is that the job never completes (been running for more than an hour now) and it is only 13 key/value pairs that is to be written to HBase :-) I have tested the map/reduce stuff works if I replace the HBase connection stuff with just a write to a text file - works OK. I have also tested that I can insert data in HBase from a similar Hadoop MapReduce job. Here is the part of the code where I guess the problem is: @Override public Tuple2 map(Tuple2 t) throws Exception { LOG.info("Tuple2 map() called"); reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes()); put.add(MasterConstants.CF_SOME, MasterConstants.COUNT, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } }).output(new HadoopOutputFormat(new TableOutputFormat(), job)); env.execute("Flink HBase Event Count Hello World Test"); This code matches the code in the HBaseWriteExample.java I should think. The "Tuple2" log line I see exactly the 13 times I expect, and the last log line I see is this: 2016-05-10 21:48:42,715 INFO org.apache.hadoop.hbase.mapreduce.TableOutputFormat - Created table instance for event_type_count Any suggestions to what the problem could be? Thanks, Palle
Re: Cassandra sink wrt Counters
On Tue, May 10, 2016 at 5:36 PM, milind parikh wrote: > When will the Cassandra sink be released? I am ready to test it out even > now. You can work with Chesnay's branch here: https://github.com/apache/flink/pull/1771 Clone his repo via Git, check out the branch, and then build it from source (https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/building.html).
Re: s3 statebackend user state size
On Tue, May 10, 2016 at 5:07 PM, Chen Qin wrote: > Future, to keep large key/value space, wiki point out using rocksdb as > backend. My understanding is using rocksdb will write to local file systems > instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3 > checkpoint state split yet? Or would implement kvstate interface makes flink > take care of large state problem? Hey Chen, when you use RocksDB, you only need to explicitly configure the file system checkpoint directory, for which you can use S3: new RocksDBStateBackend(new URI("s3://...")) The local disk path are configured via the general Flink temp directory configuration (see taskmanager.tmp.dirs in https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html, default is /tmp). State is written to the local RocksDB instance and the RocksDB files are copied to S3 on checkpoints. Does this help? – Ufuk
Re: Force triggering events on watermark
Yes, will work. I was trying another route of having a "finalize & purge trigger" that will i) onElement - Register for event time watermark but not alter nested trigger's TriggerResult ii) OnEventTime - Always purge after fire That will work with CountTrigger and other custom trigger too rt? public class FinalizePurgingTrigger extends Trigger { @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.getEnd) return nestedTrigger.onElement(element, timestamp, window, ctx); } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); switch (triggerResult) { case FIRE: return TriggerResult.FIRE_AND_PURGE; case FIRE_AND_PURGE: return TriggerResult.FIRE_AND_PURGE; default: return TriggerResult.CONTINUE; } } } Srikanth On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske wrote: > Maybe the last example of this blog post is helpful [1]. > > Best, Fabian > > [1] > https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink > > 2016-05-10 17:24 GMT+02:00 Srikanth : > >> Hi, >> >> I read the following in Flink doc "We can explicitly specify a Trigger to >> overwrite the default Trigger provided by the WindowAssigner. Note that >> specifying a triggers does not add an additional trigger condition but >> replaces the current trigger." >> So, I tested out the below code with count trigger. As per my >> understanding this will override the default watermark based trigger. >> >> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, >> 4), >> ("2016-04-07 13:11:59", 157428, 4), >> ("2016-04-07 13:11:59", 111283, 23), >> ("2016-04-07 13:11:57", 108042, 23), >> ("2016-04-07 13:12:00", 161374, 9), >> ("2016-04-07 13:12:00", 161374, 9), >> ("2016-04-07 13:11:59", 136505, 4) >> ) >> ) >>.assignAscendingTimestamps(b => f.parse(b._1).getTime()) >>.map(b => (b._3, b._2)) >> >> testStream.print >> >> val countStream = testStream >> .keyBy(_._1) >> .timeWindow(Time.seconds(20)) >> .trigger(CountTrigger.of(3)) >> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } >> >> countStream.print >> >> Output I saw confirms the documented behavior. Processing is triggered >> only when we have 3 elements for a key. >> How do I force trigger the left over records when watermark is past the >> window? I.e, I want to use triggers to start early processing but finalize >> the window based on watermark. >> >> Output shows that records for keys 23 & 9 weren't processed. >> (4,157428) >> (4,157428) >> (23,111283) >> (23,108042) >> (9,161374) >> (9,161374) >> (4,136505) >> >> (4,List(157428, 157428, 136505)) >> >> Thanks, >> Srikanth >> > >
Re: Force triggering events on watermark
Maybe the last example of this blog post is helpful [1]. Best, Fabian [1] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink 2016-05-10 17:24 GMT+02:00 Srikanth : > Hi, > > I read the following in Flink doc "We can explicitly specify a Trigger to > overwrite the default Trigger provided by the WindowAssigner. Note that > specifying a triggers does not add an additional trigger condition but > replaces the current trigger." > So, I tested out the below code with count trigger. As per my > understanding this will override the default watermark based trigger. > > val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, > 4), > ("2016-04-07 13:11:59", 157428, 4), > ("2016-04-07 13:11:59", 111283, 23), > ("2016-04-07 13:11:57", 108042, 23), > ("2016-04-07 13:12:00", 161374, 9), > ("2016-04-07 13:12:00", 161374, 9), > ("2016-04-07 13:11:59", 136505, 4) > ) > ) >.assignAscendingTimestamps(b => f.parse(b._1).getTime()) >.map(b => (b._3, b._2)) > > testStream.print > > val countStream = testStream > .keyBy(_._1) > .timeWindow(Time.seconds(20)) > .trigger(CountTrigger.of(3)) > .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } > > countStream.print > > Output I saw confirms the documented behavior. Processing is triggered > only when we have 3 elements for a key. > How do I force trigger the left over records when watermark is past the > window? I.e, I want to use triggers to start early processing but finalize > the window based on watermark. > > Output shows that records for keys 23 & 9 weren't processed. > (4,157428) > (4,157428) > (23,111283) > (23,108042) > (9,161374) > (9,161374) > (4,136505) > > (4,List(157428, 157428, 136505)) > > Thanks, > Srikanth >
Re: Cassandra sink wrt Counters
Hi Chesnay Sorry for asking the question in a confusing manner. Being new to flink, there are many questions swirling around in my head. Thanks for the details in your answers. Here's the facts , as I see them: (a) Cassandra Counters are not idempotent (b) The failures, in context of Cassandra, are not the typical failures of an ACID transaction. The failure indicate that the operation was not able to continue at the specified transaction level; meaning that at least one of the nodes didn't ack back in the requisite amount of time the reads or the writes. This failure is NOT indicative of the fact that some node (or many ) might have seen and processed the reads or writes; just that at least one of the nodes did not. There is no rollback either. The antientropy features of Cassandra will kick in and attempt to correct the situation internal to Cassandra. From an external system, though, the situation is differentif such failure occurs, one could try to retry the operation (specifically writes) again outside of Cassandra; provided one has the ability to do so through an intermediate layer (think flink)and the write is specifically modeled to be idempotent in the data model (specifically Rowkey design). One could model the data model so as to make Flink work exceptionally well with Cassandra; except counter tables. There is no way in Cassandra currently to model an idempotent counter table that I know of. Therefore an event replay that affects a counter might end up double counting. When will the Cassandra sink be released? I am ready to test it out even now. Hello Milind, I'm not entirely sure i fully understood your question, but I'll try anyway :) There is now way to provide exactly-once semantics for Cassandra's counters. As such we (will) only provide exactly-once semantics for a subset of Cassandra operations; idempotent inserts/updates. There are several things that would allow exactly-once semantics: - transactions - rather obvious i think - replaying/rollback to a given state - replay for sources/rollback for sinks upon failure - an atomic idempotent update across 2 tables. - allows tracking every read/write made; selectively re-read/write upon failure One of the key requisites is proper failure reporting though; if an update fails we *need to know*. As far as i know Cassandra doesn't make this guarantee. Regards, Chesnay Schepler On 10.05.2016 07:48, milind parikh wrote: Given FLINK 3311 & 3332, I am wondering it would be possible, without idempotent counters in Cassandra, to deliver on an exactly once sink into Cassandra. I do note that the verbiage/disc2 in 3332 does warn the user that this is not exactly "exactly once" sink. However my question has to do with whether having idempotent counters and a Data model that enables all other idempotent operations are a necessary prerequisite to exactly once semantics in flink. Asked a different way, what source and sink would enable a end-to-end exactly - once semantics, in the current state-of-the-art, with Flink in the middle. Thanks Milind
Force triggering events on watermark
Hi, I read the following in Flink doc "We can explicitly specify a Trigger to overwrite the default Trigger provided by the WindowAssigner. Note that specifying a triggers does not add an additional trigger condition but replaces the current trigger." So, I tested out the below code with count trigger. As per my understanding this will override the default watermark based trigger. val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, 4), ("2016-04-07 13:11:59", 157428, 4), ("2016-04-07 13:11:59", 111283, 23), ("2016-04-07 13:11:57", 108042, 23), ("2016-04-07 13:12:00", 161374, 9), ("2016-04-07 13:12:00", 161374, 9), ("2016-04-07 13:11:59", 136505, 4) ) ) .assignAscendingTimestamps(b => f.parse(b._1).getTime()) .map(b => (b._3, b._2)) testStream.print val countStream = testStream .keyBy(_._1) .timeWindow(Time.seconds(20)) .trigger(CountTrigger.of(3)) .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } countStream.print Output I saw confirms the documented behavior. Processing is triggered only when we have 3 elements for a key. How do I force trigger the left over records when watermark is past the window? I.e, I want to use triggers to start early processing but finalize the window based on watermark. Output shows that records for keys 23 & 9 weren't processed. (4,157428) (4,157428) (23,111283) (23,108042) (9,161374) (9,161374) (4,136505) (4,List(157428, 157428, 136505)) Thanks, Srikanth
s3 statebackend user state size
Hi there, With S3 as state backend, as well as keeping a large chunk of user state on heap. I can see task manager starts to fail without showing OOM exception. Instead, it shows a generic error message (below) when checkpoint triggered. I assume this has something to do with how state were kept in buffer and flush to s3 when checkpoint triggered. Future, to keep large key/value space, wiki point out using rocksdb as backend. My understanding is using rocksdb will write to local file systems instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state split yet? Or would implement kvstate interface makes flink take care of large state problem? Chen java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: writing tests for my program
thanks Alexander, I'll take a look On 10 May 2016 at 13:07, lofifnc wrote: > Hi, > > Some shameless self promotion: > > You can also checkout: > https://github.com/ottogroup/flink-spector > which has to the goal to remove such hurdles when testing flink programs. > > Best, > Alex > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: writing tests for my program
Hi, Some shameless self promotion: You can also checkout: https://github.com/ottogroup/flink-spector which has to the goal to remove such hurdles when testing flink programs. Best, Alex -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Streaming job software update
Hi, in our more-or-less development environment we're doing sth like that in our main method: val processName = name_of_our_stream val configuration = GlobalConfiguration.getConfiguration val system = JobClient.startJobClientActorSystem(configuration) val timeout = FiniteDuration(10, TimeUnit.SECONDS) val gateway = LeaderRetrievalUtils.retrieveLeaderGateway( LeaderRetrievalUtils.createLeaderRetrievalService(configuration), system, timeout) implicit val executor = system.dispatcher val cancelResult = gateway.ask(JobManagerMessages.getRequestRunningJobsStatus, timeout).mapTo[RunningJobsStatus].flatMap { case RunningJobsStatus(runningJobs) => runningJobs.toList.find(_.getJobName == processName).map(job => { gateway.ask(JobManagerMessages.CancelJob(job.getJobId), FiniteDuration(1, TimeUnit.MINUTES)) }).getOrElse(Future.successful(())) } Await.result(cancelResult, FiniteDuration(1, TimeUnit.MINUTES)) system.shutdown() - this basically searches running jobs by name and cancels running one. Doing sth similar you can trigger savepoint, but unfortunatelly I don't see easy way of telling ExecutionEnvironment you want to use it. Probably it can be done by some clever hack :) br, maciek On 04/05/2016 19:52, Hanson, Bruce wrote: Hi all, I’m working on using Flink to do a variety of streaming jobs that will be processing very high-volume streams. I want to be able to update a job’s software with an absolute minimum impact on the processing of the data. What I don’t understand the best way to update the software running the job. From what I gather, the way it works today is that I would have to shut down the first job, ensuring that it properly checkpoints, and then start up a new job. My concern is that this may take a relatively long time and cause problems with SLAs I may have with my users. Does Flink provide more nuanced ways of upgrading a job’s software? Are there folks out there that are working with this sort of problem, either within Flink or around it? Thank you for any help, thoughts, etc. you may have. -Bruce
回复:Blocking or pipelined mode for batch job
Hi Ufuk, Thank you for quick response! I am not very clear of the internal realize for iteration, so would you explain in detail why blocking results can not be reset after each superstep? In addition, for the below example, why it may cause deadlock in pipelined mode? DataSet mapped1 = data.map(new MyMapper()); DataSet mapped2 = data.map(new AnotherMapper()); mapped1.join(mapped2).where(...).equalTo(...);--发件人:Ufuk Celebi 发送时间:2016年5月10日(星期二) 17:22收件人:user ; wangzhijiang999 主 题:Re: Blocking or pipelined mode for batch job On Tue, May 10, 2016 at 10:56 AM, wangzhijiang999 wrote: >As I reviewed the flink source code, if the ExecutionMode is set > "Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined > property is true for two input or iteration situation in order to avoid > deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType > will be set "Pipelined" if dynamic path for iteration . So the final mode > for dynamic node in iteration is "Pipelined"? My understanding is right ? If > it is , why must use pipelined mode for dynamic node in iteration? Yes, this is a current shortcoming, because the blocking results can not be used within iterations (they can not be reset after each superstep). > Another question is that when running a batch job, if one task failed, > do i must restart the whole topology or are there any machnism to just > restart failed task? Thank you for any advise ! Currently, yes, the whole topology is restarted. There was an attempt to add support for more fine-grained fault tolerance, but it was never completed/merged. – Ufuk
Re: multi-application correlated savepoints
hmm... quite interesting question. But I think I don't fully understand your use case - how are your applications coupled? Through kafka topics? E.g. output of one is input for other? Or do they consume from same input? And why exactly do you want to get back to specific point in all of them? If there's bug in one then I guess the whole idea of microservices is to have to fix only one, and don't care about the others? I think it can be kind of tricky to do what you describe - if apps are really decoupled then there's no notion of "same time" for them. One thing that comes to my mind is having some special "message" that trigger savepoint - but not sure if it's possible and if it would work in all cases (like - what if there are many inputs for stream?) br, maciek On 10/05/2016 09:11, Krzysztof Zarzycki wrote: Hi! I'm thinking about using a great Flink functionality - savepoints . I would like to be able to stop my streaming application, rollback the state of it and restart it (for example to update code, to fix a bug). Let's say I would like travel back in time and reprocess some data. But what if I had many streaming applications running, that's states are correlated, like in microservice architecture? I would like to travel back in time in all of my services to a common point in time. Is there a possibility to somehow manage correlated savepoints? Of course I can trigger savepointing at approximately same time, but it's just an approximation, right? Is there something in Flink that could support this advanced use case? Maybe someone else hit this issue already and thought about the solution? I'll be grateful for any comments, Cheers, Krzysztof
Re: Blocking or pipelined mode for batch job
On Tue, May 10, 2016 at 10:56 AM, wangzhijiang999 wrote: >As I reviewed the flink source code, if the ExecutionMode is set > "Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined > property is true for two input or iteration situation in order to avoid > deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType > will be set "Pipelined" if dynamic path for iteration . So the final mode > for dynamic node in iteration is "Pipelined"? My understanding is right ? If > it is , why must use pipelined mode for dynamic node in iteration? Yes, this is a current shortcoming, because the blocking results can not be used within iterations (they can not be reset after each superstep). > Another question is that when running a batch job, if one task failed, > do i must restart the whole topology or are there any machnism to just > restart failed task? Thank you for any advise ! Currently, yes, the whole topology is restarted. There was an attempt to add support for more fine-grained fault tolerance, but it was never completed/merged. – Ufuk
Re: Cassandra sink wrt Counters
Hello Milind, I'm not entirely sure i fully understood your question, but I'll try anyway :) There is now way to provide exactly-once semantics for Cassandra's counters. As such we (will) only provide exactly-once semantics for a subset of Cassandra operations; idempotent inserts/updates. There are several things that would allow exactly-once semantics: * transactions o rather obvious i think * replaying/rollback to a given state o replay for sources/rollback for sinks upon failure * an atomic idempotent update across 2 tables. o allows tracking every read/write made; selectively re-read/write upon failure One of the key requisites is proper failure reporting though; if an update fails we /need to know/. As far as i know Cassandra doesn't make this guarantee. Regards, Chesnay Schepler On 10.05.2016 07:48, milind parikh wrote: Given FLINK 3311 & 3332, I am wondering it would be possible, without idempotent counters in Cassandra, to deliver on an exactly once sink into Cassandra. I do note that the verbiage/disc2 in 3332 does warn the user that this is not exactly "exactly once" sink. However my question has to do with whether having idempotent counters and a Data model that enables all other idempotent operations are a necessary prerequisite to exactly once semantics in flink. Asked a different way, what source and sink would enable a end-to-end exactly - once semantics, in the current state-of-the-art, with Flink in the middle. Thanks Milind
Blocking or pipelined mode for batch job
Hi , As I reviewed the flink source code, if the ExecutionMode is set "Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined property is true for two input or iteration situation in order to avoid deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType will be set "Pipelined" if dynamic path for iteration . So the final mode for dynamic node in iteration is "Pipelined"? My understanding is right ? If it is , why must use pipelined mode for dynamic node in iteration? Another question is that when running a batch job, if one task failed, do i must restart the whole topology or are there any machnism to just restart failed task? Thank you for any advise !
multi-application correlated savepoints
Hi! I'm thinking about using a great Flink functionality - savepoints . I would like to be able to stop my streaming application, rollback the state of it and restart it (for example to update code, to fix a bug). Let's say I would like travel back in time and reprocess some data. But what if I had many streaming applications running, that's states are correlated, like in microservice architecture? I would like to travel back in time in all of my services to a common point in time. Is there a possibility to somehow manage correlated savepoints? Of course I can trigger savepointing at approximately same time, but it's just an approximation, right? Is there something in Flink that could support this advanced use case? Maybe someone else hit this issue already and thought about the solution? I'll be grateful for any comments, Cheers, Krzysztof