Re: s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi Ufuk,

Yes, it does help with Rocksdb backend!
After tune checkpoint frequency align with network throughput, task manager 
released and job get cancelled are gone.

Chen


> On May 10, 2016, at 10:33 AM, Ufuk Celebi  wrote:
> 
>> On Tue, May 10, 2016 at 5:07 PM, Chen Qin  wrote:
>> Future, to keep large key/value space, wiki point out using rocksdb as
>> backend. My understanding is using rocksdb will write to local file systems
>> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
>> checkpoint state split yet? Or would implement kvstate interface makes flink
>> take care of large state problem?
> 
> Hey Chen,
> 
> when you use RocksDB, you only need to explicitly configure the file
> system checkpoint directory, for which you can use S3:
> 
> new RocksDBStateBackend(new URI("s3://..."))
> 
> The local disk path are configured via the general Flink temp
> directory configuration (see taskmanager.tmp.dirs in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
> default is /tmp).
> 
> State is written to the local RocksDB instance and the RocksDB files
> are copied to S3 on checkpoints.
> 
> Does this help?
> 
> – Ufuk


Re: HBase write problem

2016-05-10 Thread Flavio Pompermaier
Do you have the hbase-site.xml available in the classpath?
On 10 May 2016 23:10, "Palle"  wrote:

> HBase write problem
>
> Hi all.
>
> I have a problem writing to HBase.
>
> I am using a slightly modified example of this class to proof the concept:
>
> https://github.com/apache/flink/blob/master/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
>
> However all the HBase-specific stuff is exactly the same as in the
> HBaseWriteExample.
>
> The problem I see is that the job never completes (been running for more
> than an hour now) and it is only 13 key/value pairs that is to be written
> to HBase :-)
> I have tested the map/reduce stuff works if I replace the HBase connection
> stuff with just a write to a text file - works OK. I have also tested that
> I can insert data in HBase from a similar Hadoop MapReduce job.
>
> Here is the part of the code where I guess the problem is:
>
>   @Override
>   public Tuple2 map(Tuple2 t) throws
> Exception {
> LOG.info("Tuple2 map() called");
> reuse.f0 = new Text(t.f0);
> Put put = new Put(t.f0.getBytes());
> put.add(MasterConstants.CF_SOME, MasterConstants.COUNT,
> Bytes.toBytes(t.f1));
> reuse.f1 = put;
> return reuse;
>   }
> }).output(new HadoopOutputFormat(new
> TableOutputFormat(), job));
>
> env.execute("Flink HBase Event Count Hello World Test");
>
> This code matches the code in the HBaseWriteExample.java I should think.
>
> The "Tuple2" log line I see exactly the 13 times I expect, and the last
> log line I see is this:
> 2016-05-10 21:48:42,715 INFO
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat   - Created
> table instance for event_type_count
>
> Any suggestions to what the problem could be?
>
> Thanks,
> Palle
>


HBase write problem

2016-05-10 Thread Palle
HBase write problem

Hi all.

I have a problem writing to HBase.

I am using a slightly modified example of this class to proof the concept:
https://github.com/apache/flink/blob/master/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java

However all the HBase-specific stuff is exactly the same as in the 
HBaseWriteExample.

The problem I see is that the job never completes (been running for more than 
an hour now) and it is only 13 key/value pairs that is to be written to HBase 
:-)
I have tested the map/reduce stuff works if I replace the HBase connection 
stuff with just a write to a text file - works OK. I have also tested that I 
can insert data in HBase from a similar Hadoop MapReduce job.

Here is the part of the code where I guess the problem is:

  @Override
  public Tuple2 map(Tuple2 t) throws 
Exception {
LOG.info("Tuple2 map() called");
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes());
put.add(MasterConstants.CF_SOME, MasterConstants.COUNT, 
Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
  }
}).output(new HadoopOutputFormat(new 
TableOutputFormat(), job));

env.execute("Flink HBase Event Count Hello World Test");
 
This code matches the code in the HBaseWriteExample.java I should think.
 
The "Tuple2" log line I see exactly the 13 times I expect, and the last log 
line I see is this:
2016-05-10 21:48:42,715 INFO  
org.apache.hadoop.hbase.mapreduce.TableOutputFormat   - Created table 
instance for event_type_count

Any suggestions to what the problem could be?

Thanks,
Palle


Re: Cassandra sink wrt Counters

2016-05-10 Thread Ufuk Celebi
On Tue, May 10, 2016 at 5:36 PM, milind parikh  wrote:
> When will the Cassandra sink be released?  I am ready to test it out even
> now.

You can work with Chesnay's branch here:
https://github.com/apache/flink/pull/1771

Clone his repo via Git, check out the branch, and then build it from
source 
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/building.html).


Re: s3 statebackend user state size

2016-05-10 Thread Ufuk Celebi
On Tue, May 10, 2016 at 5:07 PM, Chen Qin  wrote:
> Future, to keep large key/value space, wiki point out using rocksdb as
> backend. My understanding is using rocksdb will write to local file systems
> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
> checkpoint state split yet? Or would implement kvstate interface makes flink
> take care of large state problem?

Hey Chen,

when you use RocksDB, you only need to explicitly configure the file
system checkpoint directory, for which you can use S3:

new RocksDBStateBackend(new URI("s3://..."))

The local disk path are configured via the general Flink temp
directory configuration (see taskmanager.tmp.dirs in
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
default is /tmp).

State is written to the local RocksDB instance and the RocksDB files
are copied to S3 on checkpoints.

Does this help?

– Ufuk


Re: Force triggering events on watermark

2016-05-10 Thread Srikanth
Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
   i) onElement - Register for event time watermark but not alter nested
trigger's TriggerResult
  ii) OnEventTime - Always purge after fire

That will work with CountTrigger and other custom trigger too rt?

public class FinalizePurgingTrigger  extends
Trigger {

@Override
public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}

Srikanth

On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske  wrote:

> Maybe the last example of this blog post is helpful [1].
>
> Best, Fabian
>
> [1]
> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>
> 2016-05-10 17:24 GMT+02:00 Srikanth :
>
>> Hi,
>>
>> I read the following in Flink doc "We can explicitly specify a Trigger to
>> overwrite the default Trigger provided by the WindowAssigner. Note that
>> specifying a triggers does not add an additional trigger condition but
>> replaces the current trigger."
>> So, I tested out the below code with count trigger. As per my
>> understanding this will override the default watermark based trigger.
>>
>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
>> 4),
>>  ("2016-04-07 13:11:59", 157428, 4),
>>  ("2016-04-07 13:11:59", 111283, 23),
>>  ("2016-04-07 13:11:57", 108042, 23),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:11:59", 136505, 4)
>> )
>> )
>>.assignAscendingTimestamps(b => f.parse(b._1).getTime())
>>.map(b => (b._3, b._2))
>>
>> testStream.print
>>
>> val countStream = testStream
>> .keyBy(_._1)
>> .timeWindow(Time.seconds(20))
>> .trigger(CountTrigger.of(3))
>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>
>> countStream.print
>>
>> Output I saw confirms the documented behavior. Processing is triggered
>> only when we have 3 elements for a key.
>> How do I force trigger the left over records when watermark is past the
>> window? I.e, I want to use triggers to start early processing but finalize
>> the window based on watermark.
>>
>> Output shows that records for keys 23 & 9 weren't processed.
>>   (4,157428)
>>   (4,157428)
>>   (23,111283)
>>   (23,108042)
>>   (9,161374)
>>   (9,161374)
>>   (4,136505)
>>
>>   (4,List(157428, 157428, 136505))
>>
>> Thanks,
>> Srikanth
>>
>
>


Re: Force triggering events on watermark

2016-05-10 Thread Fabian Hueske
Maybe the last example of this blog post is helpful [1].

Best, Fabian

[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-05-10 17:24 GMT+02:00 Srikanth :

> Hi,
>
> I read the following in Flink doc "We can explicitly specify a Trigger to
> overwrite the default Trigger provided by the WindowAssigner. Note that
> specifying a triggers does not add an additional trigger condition but
> replaces the current trigger."
> So, I tested out the below code with count trigger. As per my
> understanding this will override the default watermark based trigger.
>
> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
> 4),
>  ("2016-04-07 13:11:59", 157428, 4),
>  ("2016-04-07 13:11:59", 111283, 23),
>  ("2016-04-07 13:11:57", 108042, 23),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:11:59", 136505, 4)
> )
> )
>.assignAscendingTimestamps(b => f.parse(b._1).getTime())
>.map(b => (b._3, b._2))
>
> testStream.print
>
> val countStream = testStream
> .keyBy(_._1)
> .timeWindow(Time.seconds(20))
> .trigger(CountTrigger.of(3))
> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>
> countStream.print
>
> Output I saw confirms the documented behavior. Processing is triggered
> only when we have 3 elements for a key.
> How do I force trigger the left over records when watermark is past the
> window? I.e, I want to use triggers to start early processing but finalize
> the window based on watermark.
>
> Output shows that records for keys 23 & 9 weren't processed.
>   (4,157428)
>   (4,157428)
>   (23,111283)
>   (23,108042)
>   (9,161374)
>   (9,161374)
>   (4,136505)
>
>   (4,List(157428, 157428, 136505))
>
> Thanks,
> Srikanth
>


Re: Cassandra sink wrt Counters

2016-05-10 Thread milind parikh
Hi Chesnay

Sorry for asking the question in a confusing manner. Being new to flink,
there are many questions swirling around in my head.

Thanks for the details in your answers. Here's the facts , as I see them:

(a) Cassandra Counters are not idempotent
(b) The failures, in context of Cassandra, are not the typical failures of
an ACID transaction. The failure indicate that the operation was not able
to continue at the specified transaction level; meaning that at least one
of the nodes didn't ack back in the requisite amount of time the reads or
the writes. This failure is NOT indicative of the fact that some node (or
many ) might have seen and processed the reads or writes; just that at
least one of the nodes did not. There is no rollback either. The
antientropy features of Cassandra will kick in and attempt to correct the
situation internal to Cassandra. From an external system, though, the
situation is differentif such failure occurs, one could try to retry
the operation (specifically writes) again outside of Cassandra; provided
one has the ability to do so through an intermediate layer (think flink)and
the write is specifically modeled to be idempotent in the data model
(specifically Rowkey design).

One could model the data model so as to make Flink work exceptionally well
with Cassandra; except counter tables. There is no way in Cassandra
currently to model an idempotent counter table that I know of. Therefore an
event replay that affects a counter might end up double counting.

When will the Cassandra sink be released?  I am ready to test it out even
now.
Hello Milind,

I'm not entirely sure i fully understood your question, but I'll try anyway
:)

There is now way to provide exactly-once semantics for Cassandra's
counters. As such we (will) only provide exactly-once semantics for a
subset of Cassandra operations; idempotent inserts/updates.

There are several things that would allow exactly-once semantics:

   - transactions
  - rather obvious i think
  - replaying/rollback to a given state
  - replay for sources/rollback for sinks upon failure
  - an atomic idempotent update across 2 tables.
  - allows tracking every read/write made; selectively re-read/write
  upon failure

One of the key requisites is proper failure reporting though; if an update
fails we *need to know*. As far as i know Cassandra doesn't make this
guarantee.

Regards,
Chesnay Schepler

On 10.05.2016 07:48, milind parikh wrote:

Given FLINK 3311 & 3332, I am wondering it would be possible,  without
idempotent counters in Cassandra, to deliver on an exactly once sink into
Cassandra. I do note that the verbiage/disc2 in 3332 does warn the user
that this is not exactly "exactly once" sink.

However my question  has to do with whether having idempotent counters and
a Data model that enables all other idempotent operations are a necessary
prerequisite to exactly once semantics in flink.

Asked a different way, what source and sink would enable a end-to-end
exactly - once semantics,  in the current state-of-the-art, with Flink in
the middle.

Thanks
Milind


Force triggering events on watermark

2016-05-10 Thread Srikanth
Hi,

I read the following in Flink doc "We can explicitly specify a Trigger to
overwrite the default Trigger provided by the WindowAssigner. Note that
specifying a triggers does not add an additional trigger condition but
replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding
this will override the default watermark based trigger.

val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
4),
 ("2016-04-07 13:11:59", 157428, 4),
 ("2016-04-07 13:11:59", 111283, 23),
 ("2016-04-07 13:11:57", 108042, 23),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:12:00", 161374, 9),
 ("2016-04-07 13:11:59", 136505, 4)
)
)
   .assignAscendingTimestamps(b => f.parse(b._1).getTime())
   .map(b => (b._3, b._2))

testStream.print

val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }

countStream.print

Output I saw confirms the documented behavior. Processing is triggered only
when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the
window? I.e, I want to use triggers to start early processing but finalize
the window based on watermark.

Output shows that records for keys 23 & 9 weren't processed.
  (4,157428)
  (4,157428)
  (23,111283)
  (23,108042)
  (9,161374)
  (9,161374)
  (4,136505)

  (4,List(157428, 157428, 136505))

Thanks,
Srikanth


s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on 
heap. I can see task manager starts to fail without showing OOM exception. 
Instead, it shows a generic error message (below) when checkpoint triggered. I 
assume this has something to do with how state were kept in buffer and flush to 
s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. 
My understanding is using rocksdb will write to local file systems instead of 
sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state 
split yet? Or would implement kvstate interface makes flink take care of large 
state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. 
Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 
24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) 
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) 
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
 at 
org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at 
org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at 
org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at 
akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at 
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at 
akka.actor.ActorCell.invoke(ActorCell.scala:486) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at 
akka.dispatch.Mailbox.run(Mailbox.scala:221) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: writing tests for my program

2016-05-10 Thread Igor Berman
thanks Alexander, I'll take a look


On 10 May 2016 at 13:07, lofifnc  wrote:

> Hi,
>
> Some shameless self promotion:
>
> You can also checkout:
> https://github.com/ottogroup/flink-spector
> which has to the goal to remove such hurdles when testing flink programs.
>
> Best,
> Alex
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: writing tests for my program

2016-05-10 Thread lofifnc
Hi,

Some shameless self promotion:

You can also checkout:
https://github.com/ottogroup/flink-spector
which has to the goal to remove such hurdles when testing flink programs.

Best,
Alex

 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Streaming job software update

2016-05-10 Thread Maciek Próchniak

Hi,

in our more-or-less development environment we're doing sth like that in 
our main method:



val processName = name_of_our_stream
val configuration = GlobalConfiguration.getConfiguration
val system = JobClient.startJobClientActorSystem(configuration)

val timeout = FiniteDuration(10, TimeUnit.SECONDS)
val gateway =
  LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(configuration), 
system, timeout)

implicit val executor = system.dispatcher
val cancelResult = 
gateway.ask(JobManagerMessages.getRequestRunningJobsStatus, 
timeout).mapTo[RunningJobsStatus].flatMap {

  case RunningJobsStatus(runningJobs) =>
runningJobs.toList.find(_.getJobName == processName).map(job => {
  gateway.ask(JobManagerMessages.CancelJob(job.getJobId), 
FiniteDuration(1, TimeUnit.MINUTES))

}).getOrElse(Future.successful(()))
}
Await.result(cancelResult, FiniteDuration(1, TimeUnit.MINUTES))
system.shutdown()

- this basically searches running jobs by name and cancels running one.

Doing sth similar you can trigger savepoint, but unfortunatelly I don't 
see easy way of telling ExecutionEnvironment you want to use it. 
Probably it can be done by some clever hack :)


br,
maciek

On 04/05/2016 19:52, Hanson, Bruce wrote:

Hi all,

I’m working on using Flink to do a variety of streaming jobs that will 
be processing very high-volume streams. I want to be able to update a 
job’s software with an absolute minimum impact on the processing of 
the data. What I don’t understand the best way to update the software 
running the job. From what I gather, the way it works today is that I 
would have to shut down the first job, ensuring that it properly 
checkpoints, and then start up a new job. My concern is that this may 
take a relatively long time and cause problems with SLAs I may have 
with my users.


Does Flink provide more nuanced ways of upgrading a job’s software?

Are there folks out there that are working with this sort of problem, 
either within Flink or around it?


Thank you for any help, thoughts, etc. you may have.

-Bruce




回复:Blocking or pipelined mode for batch job

2016-05-10 Thread wangzhijiang999
Hi Ufuk, 
     Thank you for quick response! I am not very clear of the internal realize 
for iteration, so would you explain in detail why blocking results can not be 
reset after each superstep?
In addition,  for the below example, why it may cause deadlock in pipelined 
mode? 
DataSet mapped1 = data.map(new MyMapper());
DataSet mapped2 = data.map(new AnotherMapper());
mapped1.join(mapped2).where(...).equalTo(...);--发件人:Ufuk
 Celebi 发送时间:2016年5月10日(星期二) 17:22收件人:user 
; wangzhijiang999 主 题:Re: 
Blocking or pipelined mode for batch job
On Tue, May 10, 2016 at 10:56 AM, wangzhijiang999
 wrote:
>As I reviewed the flink source code, if the ExecutionMode is set
> "Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined
> property is true for two input or iteration situation in order to avoid
> deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType
> will be set "Pipelined" if dynamic path for iteration . So the final mode
> for dynamic node in iteration is "Pipelined"? My understanding is right ? If
> it is , why must use pipelined mode for dynamic node in iteration?

Yes, this is a current shortcoming, because the blocking results can
not be used within iterations (they can not be reset after each
superstep).

> Another question is that when running a batch job, if one task failed,
> do i must restart the whole topology or are there any machnism to just
> restart failed task?  Thank you for any advise !

Currently, yes, the whole topology is restarted. There was an attempt
to add support for more fine-grained fault tolerance, but it was never
completed/merged.

– Ufuk


Re: multi-application correlated savepoints

2016-05-10 Thread Maciek Próchniak

hmm...
quite interesting question. But I think I don't fully understand your 
use case - how are your applications coupled? Through kafka topics? E.g. 
output of one is input for other?

Or do they consume from same input?
And why exactly do you want to get back to specific point in all of 
them? If there's bug in one then I guess the whole idea of microservices 
is to have to fix only one, and don't care about the others?
I think it can be kind of tricky to do what you describe - if apps are 
really decoupled then there's no notion of "same time" for them. One 
thing that comes to my mind is having some special "message" that 
trigger savepoint - but not sure if it's possible and if it would work 
in all cases (like - what if there are many inputs for stream?)


br,
maciek


On 10/05/2016 09:11, Krzysztof Zarzycki wrote:

Hi!
I'm thinking about using a great Flink functionality - savepoints . I 
would like to be able to stop my streaming application, rollback the 
state of it and restart it (for example to update code, to fix a bug). 
Let's say I would like travel back in time and reprocess some data.
But what if I had many streaming applications running, that's states 
are correlated, like in microservice architecture? I would like to 
travel back in time in all of my services to a common point in time.  
Is there a possibility to somehow manage correlated savepoints? Of 
course I can trigger savepointing at approximately same time, but it's 
just an approximation, right?
Is there something in Flink that could support this advanced use case? 
Maybe someone else hit this issue already and thought about the solution?

I'll be grateful for any comments,
Cheers,
Krzysztof







Re: Blocking or pipelined mode for batch job

2016-05-10 Thread Ufuk Celebi
On Tue, May 10, 2016 at 10:56 AM, wangzhijiang999
 wrote:
>As I reviewed the flink source code, if the ExecutionMode is set
> "Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined
> property is true for two input or iteration situation in order to avoid
> deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType
> will be set "Pipelined" if dynamic path for iteration . So the final mode
> for dynamic node in iteration is "Pipelined"? My understanding is right ? If
> it is , why must use pipelined mode for dynamic node in iteration?

Yes, this is a current shortcoming, because the blocking results can
not be used within iterations (they can not be reset after each
superstep).

> Another question is that when running a batch job, if one task failed,
> do i must restart the whole topology or are there any machnism to just
> restart failed task?  Thank you for any advise !

Currently, yes, the whole topology is restarted. There was an attempt
to add support for more fine-grained fault tolerance, but it was never
completed/merged.

– Ufuk


Re: Cassandra sink wrt Counters

2016-05-10 Thread Chesnay Schepler

Hello Milind,

I'm not entirely sure i fully understood your question, but I'll try 
anyway :)


There is now way to provide exactly-once semantics for Cassandra's 
counters. As such we (will) only provide exactly-once semantics for a 
subset of Cassandra operations; idempotent inserts/updates.


There are several things that would allow exactly-once semantics:

 * transactions
 o rather obvious i think
 * replaying/rollback to a given state
 o replay for sources/rollback for sinks upon failure
 * an atomic idempotent update across 2 tables.
 o allows tracking every read/write made; selectively re-read/write
   upon failure

One of the key requisites is proper failure reporting though; if an 
update fails we /need to know/. As far as i know Cassandra doesn't make 
this guarantee.


Regards,
Chesnay Schepler

On 10.05.2016 07:48, milind parikh wrote:


Given FLINK 3311 & 3332, I am wondering it would be possible,  without 
idempotent counters in Cassandra, to deliver on an exactly once sink 
into Cassandra. I do note that the verbiage/disc2 in 3332 does warn 
the user that this is not exactly "exactly once" sink.


However my question  has to do with whether having idempotent counters 
and a Data model that enables all other idempotent operations are a 
necessary prerequisite to exactly once semantics in flink.


Asked a different way, what source and sink would enable a end-to-end 
exactly - once semantics,  in the current state-of-the-art, with Flink 
in the middle.


Thanks
Milind





Blocking or pipelined mode for batch job

2016-05-10 Thread wangzhijiang999
Hi ,
       As I reviewed the flink source code, if the ExecutionMode is set 
"Pipelined", the DataExechangeMode will be set "Batch" if breaking pipelined 
property is true for two input or iteration situation in order to avoid 
deadlock. When the DataExechangeMode is set "Batch", the ResultPartitionType 
will be set "Pipelined" if dynamic path for iteration . So the final mode for 
dynamic node in iteration is "Pipelined"? My understanding is right ? If it is 
, why must use pipelined mode for dynamic node in iteration?
    Another question is that when running a batch job, if one task failed, do i 
must restart the whole topology or are there any machnism to just restart 
failed task?  Thank you for any advise !

multi-application correlated savepoints

2016-05-10 Thread Krzysztof Zarzycki
Hi!
I'm thinking about using a great Flink functionality - savepoints . I would
like to be able to stop my streaming application, rollback the state of it
and restart it (for example to update code, to fix a bug). Let's say I
would like travel back in time and reprocess some data.
But what if I had many streaming applications running, that's states are
correlated, like in microservice architecture? I would like to travel back
in time in all of my services to a common point in time.  Is there a
possibility to somehow manage correlated savepoints? Of course I can
trigger savepointing at approximately same time, but it's just an
approximation, right?
Is there something in Flink that could support this advanced use case?
Maybe someone else hit this issue already and thought about the solution?
I'll be grateful for any comments,
Cheers,
Krzysztof