Re: PartitionNotFoundException when running in yarn-session.

2017-10-10 Thread Ufuk Celebi
Hey Niels,

any update on this?

– Ufuk


On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi  wrote:
> Hey Niels,
>
> thanks for the detailed report. I don't think that it is related to
> the Hadoop or Scala version. I think the following happens:
>
> - Occasionally, one of your tasks seems to be extremely slow in
> registering its produced intermediate result (the data shuffled
> between TaskManagers)
> - Another task is already requesting to consume data from this task
> but cannot find it (after multiple retries) and it fails the complete
> job (your stack trace)
>
> That happens only occasionally probably due to load in your cluster.
> The slow down could have multiple reasons...
> - Is your Hadoop cluster resource constrained and the tasks are slow to 
> deploy?
> - Is your application JAR very large and needs a lot of time downloading?
>
> We have two options at this point:
> 1) You can increase the maximum retries via the config option:
> "taskmanager.network.request-backoff.max" The default is 1
> (milliseconds) and specifies what the maximum request back off is [1].
> Increasing this to 3 would give you two extra retries with pretty
> long delays (see [1]).
>
> 2) To be sure that this is really what is happening we could increase
> the log level of certain classes and check whether they have
> registered their results or not. If you want to do this, I'm more than
> happy to provide you with some classes to enable DEBUG logging for.
>
> What do you think?
>
> – Ufuk
>
> DETAILS
> ===
>
> - The TaskManagers produce and consume intermediate results
> - When a TaskManager wants to consume a result, it directly queries
> the producing TaskManager for it
> - An intermediate result becomes ready for consumption during initial
> task setup (state DEPLOYING)
> - When a TaskManager is slow to register its intermediate result and
> the consumer requests the result before it is ready, it can happen
> that a requested partition is "not found"
>
> This is what is also happening here. We retry to request the
> intermediate result multiple times with timed backoff [1] and only
> fail the request (your stack trace) if the partition is still not
> ready although we expect it to be ready (that is there was no failure
> at the producing task).
>
> [1] Starting by default at 100 millis and going up to 10_000 millis by
> doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)
>
>
> On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes  wrote:
>> Hi,
>>
>> I'm having some trouble running a java based Flink job in a yarn-session.
>>
>> The job itself consists of reading a set of files resulting in a DataStream
>> (I use DataStream because in the future I intend to change the file with a
>> Kafka feed), then does some parsing and eventually writes the data into
>> HBase.
>>
>> Most of the time running this works fine yet sometimes it fails with this
>> exception:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> not found.
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>   at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>   at
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>   at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>   at
>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>   at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>   at 
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>   at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>   at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>

Windows getting created only on first execution

2017-10-10 Thread Rahul Raj
Hi ,

I have written a program which reads data from Kafka, parses the json and
does some reduce operation. The problem I am facing is, the program
executes perfectly for the first time on a day. But when I kill the program
and execute it again, an empty file is created. Even after compiling again
and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

  params.getRequired("input-topic"),

  new SimpleStringSchema,

  params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

var messageStream =
env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


var mts = messageStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks[String] {

  var ts = Long.MinValue

  override def extractTimestamp(element: String,
previousElementTimestamp: Long): Long = {

var timestamp = json_decode(element).toLong

ts = Math.max(timestamp,previousElementTimestamp)

timestamp

  }


  override def getCurrentWatermark(): Watermark = {

new Watermark(ts)

  }

})

var output = mts

  .keyBy(t=>json_decode(t))

  .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

  .allowedLateness(Time.seconds(5))

  .reduce((v1,v2)=>v1+""+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is
related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj


Re: Windowing isn't applied per key

2017-10-10 Thread Tony Wei
Hi Marcus,

I think that is an expected result for sliding window in Flink. You can see
the example in the document for more details. [1]
For your need, I will suggest to use ProcessFunction to implement the
sliding window that you expected. You can use key state to buffer elements
and onTimer to trigger each window, and all these will be done on each key.
[2]

Best Regards,
Tony Wei
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#sliding-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#example


2017-10-11 1:52 GMT+08:00 mclendenin :

> Sure, I'm going to use a name as key in this example and just a number as
> the
> value aggregated. This is the sample input data
>
> 12:00
> {"name": "Marcus", "value": 1}
> 12:01
> {"name": "Suzy", "value": 2}
> 12:03
> {"name": "Alicia", "value": 3}
> 12:04
> {"name": "Ben", "value": 1}
> 12:06
> {"name": "Alicia", "value": 1}
> {"name": "Ben", "value": 5}
>
> Expected Result in output
> 12:05
> {"name": "Marcus", "total": 1}
> 12:06
> {"name": "Suzy", "value": 2}
> 12:08
> {"name": "Alicia", "value": 4}
> 12:09
> {"name": "Ben", "value": 6}
>
> Actual Result in output
> 12:05
> {"name": "Marcus", "value": 1}
> {"name": "Suzy", "value": 2}
> {"name": "Alicia", "value": 3}
> {"name": "Ben", "value": 1}
> 12:10
> {"name": "Marcus", "value": 1}
> {"name": "Suzy", "value": 2}
> {"name": "Alicia", "value": 4}
> {"name": "Ben", "value": 6}
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


RichMapFunction parameters in the Streaming API

2017-10-10 Thread Colin Williams
I was looking for withParameters(config) in the Streaming API today. I
stumbled across the following thread.

http://apache-flink-mailing-list-archive.1008284.n3.
nabble.com/withParameters-for-Streaming-API-td9332.html#a9333

It appears that some of the StreamingAPI developers are in favor of
removing the parameters from RichMapFunctions' open. However the best
practices article

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program

Show examples of using both global configuration (where parameters are
available from open) and withParameters(config) (which doesn't work from
the Streaming API)

I'm trying to make a decision regarding using global parameters with my
Flink Streaming jobs.

Is using the global configuration a good idea for parameters in the
Streaming API or is this best practice just suggested for the Batch API?

Is there a reason for the opinion of removing the configuration parameters
from open?


Subscribe

2017-10-10 Thread Stephen Jiang



Re: Windowing isn't applied per key

2017-10-10 Thread mclendenin
Sure, I'm going to use a name as key in this example and just a number as the
value aggregated. This is the sample input data

12:00
{"name": "Marcus", "value": 1}
12:01
{"name": "Suzy", "value": 2}
12:03
{"name": "Alicia", "value": 3}
12:04
{"name": "Ben", "value": 1}
12:06
{"name": "Alicia", "value": 1}
{"name": "Ben", "value": 5}

Expected Result in output
12:05
{"name": "Marcus", "total": 1}
12:06
{"name": "Suzy", "value": 2}
12:08
{"name": "Alicia", "value": 4}
12:09
{"name": "Ben", "value": 6}

Actual Result in output
12:05
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 3}
{"name": "Ben", "value": 1}
12:10
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 4}
{"name": "Ben", "value": 6}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Windowing isn't applied per key

2017-10-10 Thread Aljoscha Krettek
Hi,

Could you maybe give an example of what you expect as output and what you 
actually get? 

Best,
Aljoscha

> On 9. Oct 2017, at 16:09, mclendenin  wrote:
> 
> I am using Processing Time, so it is using the default timestamps and
> watermarks. I am running it with a parallelism of 3, I can see each operator
> running at a parallelism of 3 on the Web UI. I am pulling data from a Kafka
> topic with 12 partitions. 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How to make my execution graph prettier?

2017-10-10 Thread Hao Sun
Great, thanks!

On Tue, Oct 10, 2017 at 7:52 AM Aljoscha Krettek 
wrote:

> Hi,
>
> The execution graph looks like this because Flink optimises your graph to
> fit all operations within a single Task. This operation is called chaining.
> The operation can be applied when there is no shuffle between operations
> and when the parallelism is the same (roughly speaking).
>
> If you wan't the graph to have separate tasks, you can disable chaining on
> the Flink ExecutionConfig. This can lead to worse performance, though.
>
> Best,
> Aljoscha
>
> On 10. Oct 2017, at 06:36, Hao Sun  wrote:
>
> Hi my execution graph looks like following, all things stuffed into on
> tile.
>
> How can I get something like this?
>
>
>


Manual checkpoint

2017-10-10 Thread nragon
Can I trigger a checkpoint based on a specific event?
Meaning, if a given event arrives (containing EOF in this case) it would be
broadcasted to all downstream operators and trigger a savepoint aftewards.

Thanks,
Nuno



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataStream joining without window

2017-10-10 Thread Aljoscha Krettek
Hi,

Yes, using a TwoInputStreamOperator (or even better, a CoProcessFunction, 
because TwoInputStreamOperator is a low-level interface that might change in 
the future) is the recommended way for implementing a stream-stream join, 
currently.

As you already guessed, you need a policy for cleanup up the state that you 
hold. You can do this using the timer features of CoProcessFunction.

Also, if you keep your buffered elements using the Flink state interfaces you 
can switch the state backend to the RocksDB backend and if you have concerns 
about the state growing too big.

Best,
Aljoscha

> On 9. Oct 2017, at 23:11, Yan Zhou [FDS Science] ­  wrote:
> 
> It seems like flink only supports DataStream joining within same time window. 
> Why is it restricted in this way? 
> 
> I think I can implement a TwoInputStreamOperator to join two DataStreams 
> without considering the window.  And inside the operator, create two state to 
> cache records of two streams and join the streams within methods 
> processElement1/processElement2. Should I go head with this approach? Is 
> there any performance consideration here? If the concern is that the cache 
> might take a lot of memory, we can introduce some cache policy and reduce the 
> size. Or can we use rocksDB state?
> 
> Please advise.
> 
> Best
> Yan
>  



Re: Questions about checkpoints/savepoints

2017-10-10 Thread Aljoscha Krettek
Hi,

Flink does not rely on file system operations to list contents, all necessary 
file paths are stored in the meta data file, as you guessed. This is the reason 
savepoints also work with file systems that "only" have read-after-write 
consistency.

Best,
Aljoscha

> On 10. Oct 2017, at 03:01, vipul singh  wrote:
> 
> Thanks Stefan for the answers above. These are really helpful.
> 
> I have a few followup questions:
> I see my savepoints are created in a folder, which has a _metadata file and 
> another file. Looking at the code 
> 
>  it seems like the metadata file contains tasks states, operator state and 
> master states 
> .
>  What is the purpose of the other file in the savepoint folder? My guess is 
> it should be a checkpoint file? 
> I am planning to use s3 as my state backend, so want to ensure that 
> application restarts are not affected by read-after-write consistency of s3( 
> if I use s3 as a savepoint backend). I am curious how flink restores data 
> from the _metadata file, and the other file? Does the _metadata file contain 
> path to these other files? or would it do a listing on the s3 folder?
> 
> Please let me know,
> 
> Thanks,
> Vipul
> 
> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter  > wrote:
> Hi,
> 
> I have answered your questions inline:
>> It seems to me that checkpoints can be treated as flink internal recovery 
>> mechanism, and savepoints act more as user-defined recovery points. Would 
>> that be a correct assumption?
> 
> You could see it that way, but I would describe savepoints more as 
> user-defined *restart* points than *recovery* points. Please take a look at 
> my answers in this thread, because they cover most of your question:
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html
>  
> 
>  .
> 
>> While cancelling an application with -s option, it specifies the savepoint 
>> location. Is there a way during application startup to identify the last 
>> know savepoint from a folder by itself, and restart from there. Since I am 
>> saving my savepoints on s3, I want to avoid issues arising from ls command 
>> on s3 due to read-after-write consistency of s3.
> 
> I don’t think that this feature exists, you have to specify the savepoint.
> 
>> Suppose my application has a checkpoint at point t1, and say i cancel this 
>> application sometime in future before the next available checkpoint( say 
>> t1+x). If I start the application without specifying the savepoint, it will 
>> start from the last known checkpoint(at t1), which wont have the application 
>> state saved, since I had cancelled the application. Would this is a correct 
>> assumption?
> 
> If you restart a canceled application it will not consider checkpoints. They 
> are only considered in recovery on failure. You need to specify a savepoint 
> or externalized checkpoint for restarts to make explicit that you intend to 
> restart a job, and not to run a new instance of the job.
> 
>> Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as 
>> manually saving regular savepoints? 
> Not the same, because checkpoints and savepoints are different in certain 
> aspects, but both methods leave you with something that survives job 
> cancelation and can be used to restart from a certain state.
> 
> Best,
> Stefan
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul



Re: How to make my execution graph prettier?

2017-10-10 Thread Aljoscha Krettek
Hi,

The execution graph looks like this because Flink optimises your graph to fit 
all operations within a single Task. This operation is called chaining. The 
operation can be applied when there is no shuffle between operations and when 
the parallelism is the same (roughly speaking). 

If you wan't the graph to have separate tasks, you can disable chaining on the 
Flink ExecutionConfig. This can lead to worse performance, though.

Best,
Aljoscha

> On 10. Oct 2017, at 06:36, Hao Sun  wrote:
> 
> Hi my execution graph looks like following, all things stuffed into on 
> tile.
> How can I get something like this?
> 
> 



Re: Unusual log message - Emitter thread got interrupted

2017-10-10 Thread Aljoscha Krettek
Just FYI: I pushed a change that changes the message and removes the stack 
trace (and the exception).

> On 10. Oct 2017, at 00:58, Ken Krugler  wrote:
> 
> Hi Aljoscha,
> 
> Thanks for responding.
> 
>> On Oct 9, 2017, at 7:36 AM, Aljoscha Krettek > > wrote:
>> 
>> Hi,
>> 
>> In my understanding this is the expected behaviour of the code. The only way 
>> to shut down the Emitter is via an interrupt because it is otherwise 
>> blocking on the queue. If the Emitter had been interrupted while the 
>> operator is still running it would have gone down a different code path: 
>> https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89
>>  
>> 
>> 
>> Did you see any other faulty behaviour or only this log message.
> 
> That seemed to be the only oddity in the logs.
> 
> I’d suggest changing the logging call to not include the exception, as 
> dumping out the stack trace in the log implies there’s a problem somewhere.
> 
> And changing the message to something like "Emitter thread got interrupted, 
> shutting it down” would make it clearer it’s not an unexpected situation.
> 
> Thanks,
> 
> — Ken
> 
> 
> 
>>> On 6. Oct 2017, at 18:17, Fabian Hueske >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> I don't have much experience with streaming iterations.
>>> Maybe Aljoscha (in CC) has an idea what is happening and if it can be 
>>> prevented.
>>> 
>>> Best, Fabian
>>> 
>>> 2017-10-05 1:33 GMT+02:00 Ken Krugler >> >:
>>> Hi all,
>>> 
>>> I’ve got a streaming topology with an iteration, and a RichAsyncFunction in 
>>> that iteration.
>>> 
>>> When the iteration terminates due to no activity, I see this message in the 
>>> logs:
>>> 
>>> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. 
>>> This indicates that the emitter should shut down.
>>> java.lang.InterruptedException
>>> at 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>>> at 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>>> at 
>>> org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
>>> at 
>>> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
>>> at java.lang.Thread.run(Thread.java:748)
>>> 
>>> I read through https://issues.apache.org/jira/browse/FLINK-5638 
>>> , which makes me wonder 
>>> if there’s a different but related issue involving an async function in an 
>>> iteration.
>>> 
>>> Or perhaps I need to do something in my RichAsyncFunction to avoid this 
>>> situation?
>>> 
>>> Or is this expected and just the way things are currently?
>>> 
>>> Just FYI, my topology is here: 
>>> https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf 
>>> 
>>> 
>>> Thanks,
>>> 
>>> — Ken
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 



Best way to setup different log files for distinct jobs

2017-10-10 Thread PedroMrChaves
Hello,

I'm using logback as my logging framework. I would like to setup Flink so
that each job outputs to a different file. Any Ideas on how could I do that? 

I am running flink in a standalone cluster with version 1.3.2.

Regards,
Pedro Chaves.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: async io operator timeouts

2017-10-10 Thread Kostas Kloudas
Perfect! Thanks a lot Karthik.

> On Oct 10, 2017, at 10:41 AM, Karthik Deivasigamani  
> wrote:
> 
> Thanks Kostas. 
> Here is the JIRA : https://issues.apache.org/jira/browse/FLINK-7789 
> 
> 
> ~
> Karthik
> 
> On Mon, Oct 9, 2017 at 7:12 PM, Kostas Kloudas  > wrote:
> Hi Karthik,
> 
> Currently there is no way to provide a handler for timed-out requests.
> So the behavior is exactly what you described. A request fails, an exception 
> is thrown and
> the job is restarted.
> 
> A handler would be a nice addition. If you want, you can open a JIRA about it 
> and if would like
> to work on it, feel free to submit a PR.
> 
> Thanks, 
> Kostas
>  
>> On Oct 6, 2017, at 4:57 PM, Karthik Deivasigamani > > wrote:
>> 
>> Hi,
>>Is there a way to catch the timeouts thrown from async io operator?
>> We use async io API to make some high latency HTTP API calls. Currently when 
>> the underlying http connection hangs and fails to timeout in the configured 
>> time the async timeout kicks in and throws an exception which causes the job 
>> to restart. Is there a way to catch this exception in application code? We 
>> are apache flink 1.3.1
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>>  
>> 
>> ~
>> Karthik
> 
> 



Re: async io operator timeouts

2017-10-10 Thread Karthik Deivasigamani
Thanks Kostas.
Here is the JIRA : https://issues.apache.org/jira/browse/FLINK-7789

~
Karthik

On Mon, Oct 9, 2017 at 7:12 PM, Kostas Kloudas 
wrote:

> Hi Karthik,
>
> Currently there is no way to provide a handler for timed-out requests.
> So the behavior is exactly what you described. A request fails, an
> exception is thrown and
> the job is restarted.
>
> A handler would be a nice addition. If you want, you can open a JIRA about
> it and if would like
> to work on it, feel free to submit a PR.
>
> Thanks,
> Kostas
>
>
> On Oct 6, 2017, at 4:57 PM, Karthik Deivasigamani 
> wrote:
>
> Hi,
>Is there a way to catch the timeouts thrown from async io operator?
> We use async io API to make some high latency HTTP API calls. Currently
> when the underlying http connection hangs and fails to timeout in the
> configured time the async timeout kicks in and throws an exception which
> causes the job to restart. Is there a way to catch this exception in
> application code? We are apache flink 1.3.1
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> asyncio.html
> ~
> Karthik
>
>
>


Re: Consult about flink on mesos cluster

2017-10-10 Thread Till Rohrmann
Hi Bo,

I think the by saying mesos.constraings.hard.hostattribute:
rack:ak03-07,rack:ak16-10, you define two hard constraints which are
attribute rack must equal ak03-07 AND rack must equal ak16-10. Since a task
offer must come from both racks, it will never complete a task request. So
at the moment it is only possible to fix a given Mesos attribute to a
single value.

Cheers,
Till
​

On Tue, Oct 10, 2017 at 3:45 AM, Bo Yu  wrote:

> Thanks, Till
>
> I tried to set hard host attribute constraints in "flink-conf.yaml" as
> mesos.constraints.hard.hostattribute: rack:ak03-07,rack:ak16-10,
> rack:ak03-04
> where "rack:akXX-XX" is the MESOS_attributes of each slave.
>
> Then I get to the situation that the mesos app master doesn't accept the
> offers to start the task manager.
> I keep get the log as: flink.log
>
> The task manager doesn't start properly even though there're sufficient
> resources..
>
> Thank you in advance, and looking forward for your advices.
>
> Best regards,
>
> Bo
>
> On Tue, Oct 10, 2017 at 12:12 AM, Till Rohrmann 
> wrote:
>
>> Hi Bo,
>>
>> you can still use Flink with Marathon, because Marathon will only
>> schedule the cluster entrypoint which is the MesosApplicationMasterRunner.
>> Everything else will be scheduled via Fenzo. Moreover, by using Marathon
>> you gain high availability because Marathon makes sure that the
>> ApplicationMaster is restarted in case of a failure.
>>
>> Cheers,
>> Till
>>
>> On Mon, Oct 9, 2017 at 2:59 PM, yubo  wrote:
>>
>>> Thanks for your reply, Till
>>> We will use without Marathon, and hope the PR is merged to latest
>>> version soon.
>>>
>>> Best regards,
>>> Bo
>>>
>>> On Oct 9, 29 Heisei, at 6:36 PM, Till Rohrmann 
>>> wrote:
>>>
>>> Hi Bo,
>>>
>>> Flink uses internally Fenzo to match tasks and offers. Fenzo does not
>>> support the Marathon constraints syntax you are referring to. At the
>>> moment, Flink only allows to define hard host attribute constraints which
>>> means that you define a host attribute which has to match exactly. Fenzo
>>> also supports constraints that work on a set of tasks [1], but this is not
>>> yet exposed to the user. With that you should be able to evenly spread your
>>> tasks across multiple machines.
>>>
>>> There is actually a PR [2] trying to add this functionality. However, it
>>> is not yet in the shape to be merged.
>>>
>>> [1] https://github.com/Netflix/Fenzo/wiki/Constraints#constr
>>> aints-that-operate-on-groups-of-tasks
>>> [2] https://github.com/apache/flink/pull/4628
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Oct 6, 2017 at 10:54 AM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
 Hi Bo,

 I'm not familiar with Mesos deployments, but I'll forward this to Till
 or Eron (in CC) who perhaps could provide some help here.

 Cheers,
 Gordon


 On 2 October 2017 at 8:49:32 PM, Bo Yu (yubo1...@gmail.com) wrote:

 Hello all,
 This is Bo, I met some problems when I tried to use flink in my mesos
 cluster (1 master, 2 slaves (cpu has 32 cores)).
 I tried to start the mesos-appmaster.sh in marathon, the job manager is
 started without problem.

 mesos-appmaster.sh -Djobmanager.heap.mb=1024
 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=32

 My problem is the task managers are all located in one single slave.
 1. (log1)
 The initial tasks in "/usr/local/flink/conf/flink-conf.yaml" is setted
 as "mesos.initial-tasks: 2"
 And also set the "mesos.constraints.hard.hostattribute: rack:ak09-27",
 which is the master node of mesos cluster.

 2. (log2)
 I tried many ways to distribute the tasks to all the available slaves,
 and without any success.
 So I decide to try add a group_by operator which I referenced from
 https://mesosphere.github.io/marathon/docs/constraints.html
 "mesos.constraints.hard.hostattribute: rack:ak09-27,GROUP_BY:2"
 According to the log, flink keep waiting for more offers and the tasks
 never been launched.

 Sorry, I am a newbie to flink, also on mesos. Please reply if my
 problem is not clear, and I will be appreciate on any hint about how to
 distribute task evenly on available resources.

 Thank you in advance.

 Best regards,

 Bo


>>>
>>>
>>
>


Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-10 Thread Piotr Nowojski
Hi, 

That’s good to hear :)

I quickly went through the code and it seems reasonable. I think there might be 
need to think a little bit more about how this cancel checkpoint should be 
exposed to the operators and what should be default action - right now by 
default cancel flag is ignored, I would like to consider if throwing an 
UnsupportedOperation would be a better long therm solution.

But at first glance I do not see any larger issues and it would great if you 
could make a pull request out of it.

Piotrek

> On 9 Oct 2017, at 15:56, Antoine Philippot  wrote:
> 
> Thanks for your advices Piotr.
> 
> Firstly, yes, we are aware that even with clean shutdown we can end up with 
> duplicated messages after a crash and it is acceptable as is it rare and 
> unintentional unlike deploying new business code or up/down scale.
> 
> I made a fork of the 1.2.1 version which we currently use and developed a 
> simple POC based on the solution to pass a boolean stopSourceSavepoint from 
> the job manager to the source when a cancel with savepoint is triggered.
> This is the altered code : 
> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint
>  
> 
> 
> We test it with our production workload and there are no duplicated messages 
> any more while hundred of thousands were duplicated before.
> 
> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate 
> to it and maybe later to the 1.4
> 
> I'm open to suggestion or to help/develop this feature upstream if you want.
> 
> 
> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski  > a écrit :
> We are planning to work on this clean shut down after releasing Flink 1.4. 
> Implementing this properly would require some work, for example:
> - adding some checkpoint options to add information about “closing”/“shutting 
> down” event
> - add clean shutdown to source functions API
> - implement handling of this clean shutdown in desired sources
> 
> Those are not super complicated changes but also not trivial.
> 
> One thing that you could do, is to implement some super hacky filter function 
> just after source operator, that you would manually trigger. Normally it 
> would pass all of the messages. Once triggered, it would wait for next 
> checkpoint to happen. It would assume that it is a save point, and would 
> start filtering out all of the subsequent messages. When this checkpoint 
> completes, you could manually shutdown your Flink application. This could 
> guarantee that there are no duplicated writes after a restart. This might 
> work for clean shutdown, but it would be a very hacky solution. 
> 
> Btw, keep in mind that even with clean shutdown you can end up with 
> duplicated messages after a crash and there is no way around this with Kafka 
> 0.9.
> 
> Piotrek
> 
>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot > > wrote:
>> 
>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and 
>> until a while).
>> 
>> We can not afford tens of thousands of duplicated messages for each 
>> application upgrade, can I help by working on this feature ?
>> Do you have any hint or details on this part of that "todo list" ? 
>>  
>> 
>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski > > a écrit :
>> Hi,
>> 
>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
>> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will 
>> be possible to achieve exactly-once end to end semantic when writing to 
>> Kafka. However this still a work in progress:
>> 
>> https://issues.apache.org/jira/browse/FLINK-6988 
>> 
>> 
>> However this is a superset of functionality that you are asking for. 
>> Exactly-once just for clean shutdowns is also on our “TODO” list (it 
>> would/could support Kafka 0.9), but it is not currently being actively 
>> developed.
>> 
>> Piotr Nowojski
>> 
>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case 
>>> which handles around 100k messages per seconds.
>>> 
>>> To upgrade our application we used to run a flink cancel with savepoint 
>>> command followed by a flink run with the previous saved savepoint and the 
>>> new application fat jar as parameter. We notice that we can have more than 
>>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>>> 
>>> This behaviour is actually problematic for this project and I try to find a 
>>> solution / workaround to avoid these duplicated messages.
>>> 
>>> The JobManager indicates clearly that the cancel call is triggered once the 
>>> savepoint is finished, but during the savepoint execution, kafka source 
>