Re: Bootstrapping

2018-01-25 Thread Chen Qin
Hi Gregory,

I have similar issue when dealing with historical data. We choose Lambda
and figure out use case specific hand off protocol.
Unless storage side can support replay logs within a time range, Streaming
application authors still needs to carry extra work to implement batching
layer


What we learned is backfill historical log streams might be too expensive/
inefficient for streaming framework to handle since streaming framework
focus on optimizing unknown streams.

Hope it helps.

Chen

On Thu, Jan 25, 2018 at 12:49 PM, Gregory Fee  wrote:

> Hi group, I want to bootstrap some aggregates based on historic data in S3
> and then keep them updated based on a stream. To do this I was thinking of
> doing something like processing all of the historic data, doing a save
> point, then restoring my program from that save point but with a stream
> source instead. Does this seem like a reasonable approach or is there a
> better way to approach this functionality? There does not appear to be a
> straightforward way of doing it the way I was thinking so
> any advice would be appreciated.
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] 
>


Bootstrapping

2018-01-25 Thread Gregory Fee
Hi group, I want to bootstrap some aggregates based on historic data in S3
and then keep them updated based on a stream. To do this I was thinking of
doing something like processing all of the historic data, doing a save
point, then restoring my program from that save point but with a stream
source instead. Does this seem like a reasonable approach or is there a
better way to approach this functionality? There does not appear to be a
straightforward way of doing it the way I was thinking so
any advice would be appreciated.

-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] 


Re: Kafka Producer timeout causing data loss

2018-01-25 Thread Vishal Santoshi
The reorder issue can be resolved by setting
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 if we talking pure kafka
producer configs ( and I believe they port over to flink kafka connecter ).
This does limit the concurrency ( at the TCP level )  when kafka is back
up  an issue which is not very limiting once we have understood  the
batch,size and linger.ms configurations and set them up optimally, of kafka
producer.

On Thu, Jan 25, 2018 at 11:41 AM, Elias Levy 
wrote:

> Try setting the Kafka producer config option for number of retries
> ("retries") to a large number, to avoid the timeout.  It defaults to zero.
> Do note that retries may result reordered records.
>
> On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel 
> wrote:
>
>> Fabian,
>>
>> Thanks for your feedback - very helpful as usual !
>>
>> This is sort of becoming a huge problem for us right now because of our
>> Kafka situation. For some reason I missed this detail going through the
>> docs. We are definitely seeing heavy dose of data loss when Kafka timeouts
>> are happening.
>>
>> We actually have 1.4 version - I’d be interested to understand if
>> anything can be done in 1.4 to prevent this scenario.
>>
>> One other thought I had was an ability to invoke “Checkpointing before
>> Restart / Recovery” -> meaning I don’t necessarily need to checkpoint
>> periodically but I do want to make sure on a explicit restart /
>> rescheduling like this, we do have a decent “last known” state. Not sure if
>> this is currently doable.
>>
>> Thanks, Ashish
>>
>> On Jan 23, 2018, at 5:03 AM, Fabian Hueske  wrote:
>>
>> Hi Ashish,
>>
>> Originally, Flink always performed full recovery in case of a failure,
>> i.e., it restarted the complete application.
>> There is some ongoing work to improve this and make recovery more
>> fine-grained (FLIP-1 [1]).
>> Some parts have been added for 1.3.0.
>>
>> I'm not familiar with the details, but Stefan (in CC) should be able to
>> answer your specific question.
>>
>> Best, Fabian
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%
>> 3A+Fine+Grained+Recovery+from+Task+Failures
>> 
>>
>> 2018-01-19 20:59 GMT+01:00 ashish pok :
>>
>>> Team,
>>>
>>> One more question to the community regarding hardening Flink Apps.
>>>
>>> Let me start off by saying we do have known Kafka bottlenecks which we
>>> are in the midst of resolving. So during certain times of day, a lot of our
>>> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
>>> some flavor of this:
>>>
>>> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
>>> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
>>> linger time
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.processElement(FlinkKafkaProducer010.java:421)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:207)
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:69)
>>> at 

Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all 
of the inputs. Thus your hypothetical “ACK Operator” will get 
Watermark(Long.MAX_VALUE) only when of the preceding operators report 
Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like 
`flatMap`, that doesn’t emit anything, only passes the watermark (default 
implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your 
own implementation of an operator extending from AbstractStreamOperator. 
Probably you would only need to override processWatermark() method and there 
you could do the ACK operation once you get 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

> On 25 Jan 2018, at 17:56, Vinay Patil  wrote:
> 
> Hi Piotrek,
> 
> Thank you for your detailed answer.
> 
> Yes, I want to generate the ack when all the records of the file are written 
> to DB.
> 
> So to understand what you are saying , we will receive a single EOF watermark 
> value at the ack operator when all the downstream operator process all the 
> records of the file. But what I understand regarding the watermark is each 
> parallel instance of the operator will emit the watermark, so how do I ensure 
> that the EOF is reached  or will I receive only one watermark at the ack 
> operator ?
> 
> 
> So the pipeline topology will look like 
> 
> DataStream  readFileStream = env.readFile()
> 
> readFileStream
>  .transform(// ContrinousFileReaderOperator)
>  .key(0)
>  .map(// encrichment)
>   .addSink(// DB)
> 
>  instead of add sink, should it be a  simple map operator which writes to DB 
> so that we can have a next ack operator which will generate the response.
> 
> Also, how do I get/access the Watermark value in the ack operator ? It will 
> be a simple  map operator, right ?
> 
> 
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> As you figured out, some dummy EOF record is one solution, however you might 
> try to achieve it also by wrapping an existing CSV function. Your wrapper 
> could emit this dummy EOF record. Another (probably better) idea is to use 
> Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
> ContrinousFileReaderOperator will do that for you, so you would just need to 
> handle the Watermark.
> 
> The question is, do you need to perform the ACK operation AFTER all of the DB 
> writes, or just after reading the CSV file? If the latter one, you could add 
> some custom ACK operator with parallelism one just after the CSV source that 
> waits for the EOF Watermark. 
> 
> If it is the first one (some kind of committing the DB writes), you would 
> need to to wait until the EOF passes through all of your operators. You would 
> need something like that:
> 
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
> 
> I hope this helps,
> Piotrek
> 
>> On 24 Jan 2018, at 23:19, Vinay Patil > > wrote:
>> 
>> Hi Guys,
>> 
>> Following is how my pipeline looks (DataStream API) :
>> 
>> [1] Read the data from the csv file
>> [2] KeyBy it by some id
>> [3] Do the enrichment and write it to DB
>> 
>> [1] reads the data in sequence as it has single parallelism and then I have 
>> default parallelism for the other operators.
>> 
>> I want to generate a response (ack) when all the data of the file is 
>> processed. How can I achieve this ?
>> 
>> One solution I can think of is to have EOF dummy record in a file and a 
>> unique field for all the records in that file. Doing a keyBy on this field 
>> will make sure that all records are sent to a single slot. So, when EOF  
>> dummy records is read I can generate a response/ack.
>> 
>> Is there a better way I can deal with this ?
>> 
>> 
>> Regards,
>> Vinay Patil
> 
> 



Re: Send ACK when all records of file are processed

2018-01-25 Thread Vinay Patil
Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are
written to DB.

So to understand what you are saying , we will receive a single EOF
watermark value at the ack operator when all the downstream operator
process all the records of the file. But what I understand regarding the
watermark is each parallel instance of the operator will emit the
watermark, so how do I ensure that the EOF is reached  or will I receive
only one watermark at the ack operator ?


So the pipeline topology will look like

DataStream  readFileStream = env.readFile()

readFileStream
 .transform(// ContrinousFileReaderOperator)
 .key(0)
 .map(// encrichment)
  .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to
DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will
be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski 
wrote:

> Hi,
>
> As you figured out, some dummy EOF record is one solution, however you
> might try to achieve it also by wrapping an existing CSV function. Your
> wrapper could emit this dummy EOF record. Another (probably better) idea is
> to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or
> ContrinousFileReaderOperator will do that for you, so you would just need
> to handle the Watermark.
>
> The question is, do you need to perform the ACK operation AFTER all of the
> DB writes, or just after reading the CSV file? If the latter one, you could
> add some custom ACK operator with parallelism one just after the CSV source
> that waits for the EOF Watermark.
>
> If it is the first one (some kind of committing the DB writes), you would
> need to to wait until the EOF passes through all of your operators. You
> would need something like that:
>
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
>
> I hope this helps,
> Piotrek
>
> On 24 Jan 2018, at 23:19, Vinay Patil  wrote:
>
> Hi Guys,
>
> Following is how my pipeline looks (DataStream API) :
>
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
>
> [1] reads the data in sequence as it has single parallelism and then I
> have default parallelism for the other operators.
>
> I want to generate a response (ack) when all the data of the file is
> processed. How can I achieve this ?
>
> One solution I can think of is to have EOF dummy record in a file and a
> unique field for all the records in that file. Doing a keyBy on this field
> will make sure that all records are sent to a single slot. So, when EOF
> dummy records is read I can generate a response/ack.
>
> Is there a better way I can deal with this ?
>
>
> Regards,
> Vinay Patil
>
>
>


Re: Trying to understand why a job is spilling despite of huge memory provided

2018-01-25 Thread Fabian Hueske
Hi Konstantin,

Flink's sort algorithm works as follows:

- Each memory-consuming task (such as sort or combine) has a memory budget
which depends on the number of operators in the plan, the TM managed
memory, and the number of slots of the TM. Each TM slot has the same
fraction of the over all TM memory. If there are two memory consuming
operators (combine and sort), each of their tasks gets 50% of the slot
memory. So if you have a TM with 40GB and 4 slots, each slot has 10GB and
each task 5 GB.
- The sort splits its memory budget in three buffers.
- The first buffer is filled with incoming data. Once full, it is sorted
and the second buffer is filled. When the second buffer is full, the third
buffer is filled and the second buffer is sorted when sorting the first
buffer finished.
- When the first buffer is sorted, Flink waits until a certain amount of
data is received (by default 70% of the sort's memory budget). When that
happens, it starts spilling the first buffer to disk. When the buffer is
spilled, the first buffer can be filled again.
- When all data was read, the last buffer is only sorted but not spilled.
- The sorted stream is produced by merging the sorted and spilled records.

There are a few reasons that might cause spilling.
1) the spilling threshold is too tight. For example to sort 10GB in memory
(in a single task), you need more than 14.20GB of sorter memory (10GB /
0.7). The idea here is start early enough to spill such that the first
buffer is empty before the third buffer is filled we and we have to block
the input.
I'm not sure if it is easily possible to tweak the threshold.
2) the data might be skewed.

Something that you could try is to use a hash-combiner which can help to
improve the combine rate if you have have a rather low number of distinct
keys.
Hash combiners have to be explicitly chosen and are only available for
ReduceFunctions.
So you would have to implement the sum as a ReduceFunction and hint the
hash combiner like this

input.groupBy(0, 1).reduce(new SumFunc()).setCombineHint(CombineHint.HASH)

Hope this helps,
Fabian

2018-01-22 16:13 GMT+01:00 Konstantin Gregor 
:

> Hello everyone,
>
> I have a question about the spilling behavior of a Flink batch job.
>
> The relevant part is a standard map-reduce, aggregating 4 billion
> Tuple3 together via a groupBy(0,1).sum(2).
> And there really doesn't happen much else in the job.
>
> The problem is that I don't understand why this job spills to disk. In
> this example the spilling is not really an issue, but we run the same
> job with much larger datasets, where we simply run out of disk space. So
> we're trying to understand better what it spills and what we can do
> about it.
>
> In this example, I am running on AWS EMR (Flink 1.3.1) with a machine
> with 240GB memory. I tweaked the following parameters:
>
> yarn.heap-cutoff-ratio: 0.1
> taskmanager.memory.fraction: 0.9
> taskmanager.network.numberOfBuffers: 32768
>
> This leads to 170GB Flink Managed Memory which in my opinion should
> suffice for the amount of data (the amount of data going from the
> combine to the reduce is roughly 80GB). However, it is spilling over
> 70GB on disk.
>
> Do you have a hint for me why this could be the case and can explain
> what exactly is written into the state on such a group-reduce?
>
> Thank you so much for your input,
> best regards
>
> Konstantin
>
>
> --
> Konstantin Gregor * konstantin.gre...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Amtsgericht München, HRB 135082
>


Re: Failing to recover once checkpoint fails

2018-01-25 Thread Vishal Santoshi
To add to this, we are assuming that the default configuration will fail a
pipeline if  a checkpoint fails and will hit the recover loop only and only
if the retry limit is not reached




On Thu, Jan 25, 2018 at 7:00 AM, Vishal Santoshi 
wrote:

> Sorry.
>
> There are 2 scenerios
>
>   * Idempotent Sinks Use Case where we would want to restore from the
> latest valid checkpoint.  If I understand the code correctly we try to
> retrieve all completed checkpoints  for all handles in ZK and abort ( throw
> an exception ) if there are handles but no corresponding complete
> checkpoints in hdfs,  else we use the latest valid checkpoint state.  On
> abort a restart  and thus restore of the  pipe  is issued repeating the
> above execution. If the failure in hdfs was transient a retry will succeed
> else when the  retry limit is reached the pipeline is aborted for good.
>
>
> * Non Idempotent Sinks where we have no retries. We do not want to recover
> from the last available checkpoint as the above code will do as the more
> into history we go the more duplicates will be delivered. The only solution
> is use exactly once semantics of the source and sinks if possible.
>
>
>
>
>
>
>
>
> On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek 
> wrote:
>
>> Did you see my second mail?
>>
>>
>> On 24. Jan 2018, at 12:50, Vishal Santoshi 
>> wrote:
>>
>> As in, if there are chk handles in zk, there should no reason to start a
>> new job ( bad handle, no hdfs connectivity etc ),
>>  yes that sums it up.
>>
>> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Wait a sec, I just checked out the code again and it seems we already do
>>> that: https://github.com/apache/flink/blob/9071e3befb8c279f7
>>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apac
>>> he/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>>>
>>> If there were some checkpoints but none could be read we fail recovery.
>>>
>>>
>>> On 24. Jan 2018, at 11:32, Aljoscha Krettek  wrote:
>>>
>>> That sounds reasonable: We would keep the first fix, i.e. never delete
>>> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
>>> we fail the job if there are some checkpoints in ZooKeeper but none of them
>>> can be restored to prevent the case where a job starts from scratch even
>>> though it shouldn't.
>>>
>>> Does that sum it up?
>>>
>>> On 24. Jan 2018, at 01:19, Vishal Santoshi 
>>> wrote:
>>>
>>> If we hit the retry limit, abort the job. In our case we will restart
>>> from the last SP ( we as any production pile do it is n time s a day )  and
>>> that I would think should be OK for most folks ?
>>>
>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Thank you for considering this. If I understand you correctly.

 * CHK pointer on ZK for a CHK state on hdfs was done successfully.
 * Some issue restarted the pipeline.
 * The NN was down unfortunately and flink could not retrieve the  CHK
 state from the CHK pointer on ZK.

 Before

 * The CHK pointer was being removed and the job started from a brand
 new slate.

 After ( this fix on 1.4 +)

 * do not delete the CHK pointer ( It has to be subsumed to be deleted
 ).
 * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit
 any retry limit ) to restore state
 * NN comes back
 * Flink restores state on the next retry.

 I would hope that is the sequence to follow.

 Regards.








 On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek 
 wrote:

> Hi Vishal,
>
> I think you might be right. We fixed the problem that checkpoints
> where dropped via https://issues.apache.org/jira/browse/FLINK-7783.
> However, we still have the problem that if the DFS is not up at all then 
> it
> will look as if the job is starting from scratch. However, the alternative
> is failing the job, in which case you will also never be able to restore
> from a checkpoint. What do you think?
>
> Best,
> Aljoscha
>
>
> On 23. Jan 2018, at 10:15, Fabian Hueske  wrote:
>
> Sorry for the late reply.
>
> I created FLINK-8487 [1] to track this problem
>
> @Vishal, can you have a look and check if if forgot some details? I
> logged the issue for Flink 1.3.2, is that correct?
> Please add more information if you think it is relevant.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8487
>
> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi 
> :
>
>> Or this one
>>
>> https://issues.apache.org/jira/browse/FLINK-4815
>>
>> On 

Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Ishwara Varnasi
Yes, makes sense, I think consider one of those better options. Thanks! 
Ishwara

Sent from my iPhone

> On Jan 25, 2018, at 7:12 AM, Piotr Nowojski  wrote:
> 
> If you want to go this way, you could:
> - as you proposed use some busy waiting with reading some file from a 
> distributed file system
> - wait for some network message (opening your own socket)
> - use some other external system for this purpose: Kafka? Zookeeper?  
> 
> Although all of them seems hacky and I would prefer (as I proposed before) to 
> pre compute those ids before running/starting the main Flink application. 
> Probably would be simpler and easier to maintain.
> 
> Piotrek
> 
>> On 25 Jan 2018, at 13:47, Ishwara Varnasi  wrote:
>> 
>> The FLIP-17 is promising. Until it’s available I’m planning to do this: 
>> extend Kafka consumer and add logic to hold consuming until other source 
>> (fixed set) completes sending and those messages are processed by the 
>> application. However the question is to how to let the Kafka consumer know 
>> that it should now start consuming messages. What is the correct way to 
>> broadcast messages to other tasks at runtime? I’d success with the 
>> distributed cache (ie write status to a file in one task and other looks for 
>> status in this file), but doesn’t look like good solution although works. 
>> Thanks for the pointers.
>> Ishwara Varnasi 
>> 
>> Sent from my iPhone
>> 
>>> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski  wrote:
>>> 
>>> Hi,
>>> 
>>> As far as I know there is currently no simple way to do this:
>>> Join stream with static data in 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>> and
>>> https://issues.apache.org/jira/browse/FLINK-6131
>>> 
>>> One walk around might be to buffer on the state the Kafka input in your 
>>> TwoInput operator until all of the broadcasted messages have arrived.
>>> Another option might be to dynamically start your application. First run 
>>> some computation to determine the fixed lists of ids and start the flink 
>>> application with those values hardcoded in/passed via command line 
>>> arguments.
>>> 
>>> Piotrek 
>>> 
 On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
 
 Hello,
 I have a scenario where I've two sources, one of them is source of fixed 
 list of ids for preloading (caching certain info which is slow) and second 
 one is the kafka consumer. I need to run Kafka after first one completes. 
 I need a mechanism to let the Kafka consumer know that it can start 
 consuming messages. How can I achieve this?
 thanks
 Ishwara Varnasi
>>> 
> 


Re: Timer & Window Memory Consumption

2018-01-25 Thread Aljoscha Krettek
You can connect to the TaskManagers with a tool such as jvisualvm to observe 
where the objects are created. It doesn't sound normal that there are millions 
of these objects if only a couple thousand elements come in.

> On 25. Jan 2018, at 14:59, Fabian Hueske  wrote:
> 
> Aljoscha (in CC), do you have an idea about this issue?
> 
> Thanks,
> Fabian
> 
> 2018-01-24 7:06 GMT+01:00 Navneeth Krishnan  >:
> Thanks Fabian but for 1.5k messages per second per TM there are several 
> million Internal & TimerWindow objects created within a period of 5 seconds. 
> Is there a way to get debug this issue?
>  
> Regards,
> Navneeth
> 
> On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske  > wrote:
> Hi,
> 
> TimeWindows and Timers are created for each window, i.e., every 5 seconds for 
> every distinct key that a task is processing. 
> Event-time windows are completed and cleaned up when a watermark is received 
> that passes the window end timestamp. 
> Therefore, there might be more than one window per key depending on the 
> watermarks.
> 
> Hope this helps,
> Fabian
> 
> 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan  >:
> Hi,
> 
> I'm facing issues with frequent young generation garbage collections in my 
> task manager which happens approximately every few seconds. I have 3 task 
> managers with 12GB heap allocated on each and I have set the config to use 
> G1GC. My program ingests binary data from kafka source and the message rate 
> is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the 
> operators used in the program.
> 
> kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> 
> FlatMap -> Sink
> 
> I captured the below histograms at 5 second intervals and analyzed the heap 
> as well. It looks like a lot InternalTimer and TimeWindow objects are created.
> 
> Also, I see a high usage in 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> 
> Window code:
> dataStream.keyBy(new MessageKeySelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> .apply(new Aggregate());
> 
> Captured at time T:
> 
>  num #instances #bytes  class name
> --
>1:   2074427  481933816  [B
>2:357192  339368592  [D
>3:  12759222  204147552  java.lang.Integer
>4: 31416   85151832  [I
>5:900982   83872240  [C
>6:631888   20220416  java.util.HashMap$Node
>7:804203   19300872  java.lang.String
>8:541651   17332832  
> org.apache.flink.streaming.api.operators.InternalTimer
>9:540252   17288064  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> 
> Captured at T1 (T + 5 seconds):
> 
>  num #instances #bytes  class name
> --
>1:  12084258 2282849264   [B
>2:   1922018 1828760896  [D
>3:  68261427 1092182832  java.lang.Integer
>4:   2712099  291488736  [C
>5: 54201   98798976  [I
>6:   2028250   48678000  java.lang.String
>7: 66080   43528136  [[B
>8:   1401915   35580168  [Ljava.lang.Object;
>9:949062   30369984  java.util.HashMap$Node
>   10:570832   18266624  
> org.apache.flink.streaming.api.operators.InternalTimer
>   11:549979   17599328  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> 
> Captured at T2 (T1+ 5 seconds):
> 
>  num #instances #bytes  class name
> --
>1:   9911982 2920384472  [B
>2:   1584406 1510958520  [D
>3:  56087337  897397392  java.lang.Integer
>4:  26080337  834570784  java.util.HashMap$Node
>5:  25756748  824215936  
> org.apache.flink.streaming.api.operators.InternalTimer
>6:  25740086  823682752  
> org.apache.flink.streaming.api.windowing.windows.TimeWindow
> 
> Thanks.
> 
> 
> 
> 



Re: Timer & Window Memory Consumption

2018-01-25 Thread Fabian Hueske
Aljoscha (in CC), do you have an idea about this issue?

Thanks,
Fabian

2018-01-24 7:06 GMT+01:00 Navneeth Krishnan :

> Thanks Fabian but for 1.5k messages per second per TM there are several
> million Internal & TimerWindow objects created within a period of 5
> seconds. Is there a way to get debug this issue?
>
> Regards,
> Navneeth
>
> On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> TimeWindows and Timers are created for each window, i.e., every 5 seconds
>> for every distinct key that a task is processing.
>> Event-time windows are completed and cleaned up when a watermark is
>> received that passes the window end timestamp.
>> Therefore, there might be more than one window per key depending on the
>> watermarks.
>>
>> Hope this helps,
>> Fabian
>>
>> 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan :
>>
>>> Hi,
>>>
>>> I'm facing issues with frequent young generation garbage collections in
>>> my task manager which happens approximately every few seconds. I have 3
>>> task managers with 12GB heap allocated on each and I have set the config to
>>> use G1GC. My program ingests binary data from kafka source and the message
>>> rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the
>>> operators used in the program.
>>>
>>> kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) ->
>>> FlatMap -> Sink
>>>
>>> I captured the below histograms at 5 second intervals and analyzed the
>>> heap as well. It looks like a lot InternalTimer and TimeWindow objects are
>>> created.
>>>
>>> Also, I see a high usage in org.apache.flink.streaming.
>>> api.operators.HeapInternalTimerService.
>>>
>>> *Window code:*
>>> dataStream.keyBy(new MessageKeySelector())
>>> .window(TumblingEventTimeWindo
>>> ws.of(Time.seconds(5)))
>>> .apply(new Aggregate());
>>>
>>> *Captured at time T:*
>>>
>>>  num #instances #bytes  class name
>>> --
>>>1:   2074427  481933816  [B
>>>2:357192  339368592  [D
>>>3:  12759222  204147552  java.lang.Integer
>>>4: 31416   85151832  [I
>>>5:900982   83872240  [C
>>>6:631888   20220416  java.util.HashMap$Node
>>>7:804203   19300872  java.lang.String
>>>8:541651   17332832  org.apache.flink.streaming.api
>>> .operators.InternalTimer
>>>9:540252   17288064  org.apache.flink.streaming.api
>>> .windowing.windows.TimeWindow
>>>
>>>
>>> *Captured at T1 (T + 5 seconds):*
>>>
>>>  num #instances #bytes  class name
>>> --
>>>1:  12084258 2282849264 <(228)%20284-9264>  [B
>>>2:   1922018 1828760896  [D
>>>3:  68261427 1092182832  java.lang.Integer
>>>4:   2712099  291488736  [C
>>>5: 54201   98798976  [I
>>>6:   2028250   48678000  java.lang.String
>>>7: 66080   43528136  [[B
>>>8:   1401915   35580168  [Ljava.lang.Object;
>>>9:949062   30369984  java.util.HashMap$Node
>>>   10:570832   18266624  org.apache.flink.streaming.api
>>> .operators.InternalTimer
>>>   11:549979   17599328  org.apache.flink.streaming.api
>>> .windowing.windows.TimeWindow
>>>
>>>
>>> *Captured at T2 (T1+ 5 seconds):*
>>>
>>>  num #instances #bytes  class name
>>> --
>>>1:   9911982 2920384472  [B
>>>2:   1584406 1510958520  [D
>>>3:  56087337  897397392  java.lang.Integer
>>>4:  26080337  834570784  java.util.HashMap$Node
>>>5:  25756748  824215936  org.apache.flink.streaming.api
>>> .operators.InternalTimer
>>>6:  25740086  823682752  org.apache.flink.streaming.api
>>> .windowing.windows.TimeWindow
>>>
>>> Thanks.
>>>
>>>
>>
>


Re: Does web ui hang for 1.4 for job submission ?

2018-01-25 Thread Vishal Santoshi
In our case we do not see that happening...when the web ui hangs and we
refresh the page it does not effect TMs etc.  Is there something splecific
that you do that triggers this other issue ?

On Jan 25, 2018 7:15 AM, "Chesnay Schepler"  wrote:

> That's a separate issue, and I would think deserves a JIRA as it sounds
> rather serious.
>
> On 25.01.2018 12:43, Lasse Nedergaard wrote:
>
> Great news 
>
> Does it also cover protection against loosing all your task managers?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 25. jan. 2018 kl. 12.00 skrev Chesnay Schepler :
>
> In 1.4, jobs submitted through the WebUI that never actually execute a job
> will cause the UI to hang, with the only option
> of resolving that being to reload the page.
>
> This will be fixed for 1.4.1 and 1.5 .
>
> On 24.01.2018 19:19, Vishal Santoshi wrote:
>
> Exactly, the same context and issue. We too depend on args too and server
> side  validations and that on error ui hangs.
>
> On Wed, Jan 24, 2018 at 1:13 PM, Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>>
>> Our jobs take a number of command line options and validate them during
>> startup and throw an exception if something is wrong. When this exception
>> happens ui hang all prev. uploaded jars are removed most task manager are
>> lost and all running jobs terminated. We running Flink 1.4 on dc/os
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi <
>> vishal.santo...@gmail.com>:
>>
>> Yep, got it. It seems that the response when  in  an error state is
>> reported by JM is not being rendered.
>>
>>  We can see the response in the UI debugger but not on the UI itself.
>>
>> On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:
>>
>>> It's not a stupid question at all! Try the following please:
>>> 1) Use something like Chrome's Developer Tools to check the responses
>>> you get from the web UI. If you see an error there, that should point
>>> you to what's going on.
>>> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
>>> doesn't help)
>>>
>>> We just merged a change into master that does a better job of
>>> reporting any errors that occur during submission.
>>>
>>> – Ufuk
>>>
>>>
>>> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>>>  wrote:
>>> > We have had 2 different installations and when e go submit a job, it
>>> gives
>>> > us the spinning wheel and the request never reaches the JM. CLI works
>>> > without issues. Is this us alone or some one else seen this.
>>> >
>>> > Again 1.3.2 does not have the issue and we have the same nodes, n/w
>>> etc so
>>> > we sure nothing should have changed on our side.
>>> >
>>> > Apologies if this is a stupid question with an obvious solution.
>>> >
>>> > Regards
>>> >
>>> > Vishal
>>>
>>
>>
>
>
>


Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
If you want to go this way, you could:
- as you proposed use some busy waiting with reading some file from a 
distributed file system
- wait for some network message (opening your own socket)
- use some other external system for this purpose: Kafka? Zookeeper?  

Although all of them seems hacky and I would prefer (as I proposed before) to 
pre compute those ids before running/starting the main Flink application. 
Probably would be simpler and easier to maintain.

Piotrek

> On 25 Jan 2018, at 13:47, Ishwara Varnasi  wrote:
> 
> The FLIP-17 is promising. Until it’s available I’m planning to do this: 
> extend Kafka consumer and add logic to hold consuming until other source 
> (fixed set) completes sending and those messages are processed by the 
> application. However the question is to how to let the Kafka consumer know 
> that it should now start consuming messages. What is the correct way to 
> broadcast messages to other tasks at runtime? I’d success with the 
> distributed cache (ie write status to a file in one task and other looks for 
> status in this file), but doesn’t look like good solution although works. 
> Thanks for the pointers.
> Ishwara Varnasi 
> 
> Sent from my iPhone
> 
> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski  > wrote:
> 
>> Hi,
>> 
>> As far as I know there is currently no simple way to do this:
>> Join stream with static data in 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>  
>> 
>> and
>> https://issues.apache.org/jira/browse/FLINK-6131 
>> 
>> 
>> One walk around might be to buffer on the state the Kafka input in your 
>> TwoInput operator until all of the broadcasted messages have arrived.
>> Another option might be to dynamically start your application. First run 
>> some computation to determine the fixed lists of ids and start the flink 
>> application with those values hardcoded in/passed via command line arguments.
>> 
>> Piotrek 
>> 
>>> On 25 Jan 2018, at 04:10, Ishwara Varnasi >> > wrote:
>>> 
>>> Hello,
>>> I have a scenario where I've two sources, one of them is source of fixed 
>>> list of ids for preloading (caching certain info which is slow) and second 
>>> one is the kafka consumer. I need to run Kafka after first one completes. I 
>>> need a mechanism to let the Kafka consumer know that it can start consuming 
>>> messages. How can I achieve this?
>>> thanks
>>> Ishwara Varnasi
>> 



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Ishwara Varnasi
The FLIP-17 is promising. Until it’s available I’m planning to do this: extend 
Kafka consumer and add logic to hold consuming until other source (fixed set) 
completes sending and those messages are processed by the application. However 
the question is to how to let the Kafka consumer know that it should now start 
consuming messages. What is the correct way to broadcast messages to other 
tasks at runtime? I’d success with the distributed cache (ie write status to a 
file in one task and other looks for status in this file), but doesn’t look 
like good solution although works. 
Thanks for the pointers.
Ishwara Varnasi 

Sent from my iPhone

> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski  wrote:
> 
> Hi,
> 
> As far as I know there is currently no simple way to do this:
> Join stream with static data in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> and
> https://issues.apache.org/jira/browse/FLINK-6131
> 
> One walk around might be to buffer on the state the Kafka input in your 
> TwoInput operator until all of the broadcasted messages have arrived.
> Another option might be to dynamically start your application. First run some 
> computation to determine the fixed lists of ids and start the flink 
> application with those values hardcoded in/passed via command line arguments.
> 
> Piotrek 
> 
>> On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
>> 
>> Hello,
>> I have a scenario where I've two sources, one of them is source of fixed 
>> list of ids for preloading (caching certain info which is slow) and second 
>> one is the kafka consumer. I need to run Kafka after first one completes. I 
>> need a mechanism to let the Kafka consumer know that it can start consuming 
>> messages. How can I achieve this?
>> thanks
>> Ishwara Varnasi
> 


Re: Does web ui hang for 1.4 for job submission ?

2018-01-25 Thread Chesnay Schepler
That's a separate issue, and I would think deserves a JIRA as it sounds 
rather serious.


On 25.01.2018 12:43, Lasse Nedergaard wrote:

Great news 

Does it also cover protection against loosing all your task managers?

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 25. jan. 2018 kl. 12.00 skrev Chesnay Schepler >:


In 1.4, jobs submitted through the WebUI that never actually execute 
a job will cause the UI to hang, with the only option

of resolving that being to reload the page.

This will be fixed for 1.4.1 and 1.5 .

On 24.01.2018 19:19, Vishal Santoshi wrote:
Exactly, the same context and issue. We too depend on args too and 
server side  validations and that on error ui hangs.


On Wed, Jan 24, 2018 at 1:13 PM, Lasse Nedergaard 
> wrote:


Hi.

Our jobs take a number of command line options and validate them
during startup and throw an exception if something is wrong.
When this exception happens ui hang all prev. uploaded jars are
removed most task manager are lost and all running jobs
terminated. We running Flink 1.4 on dc/os

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi
>:


Yep, got it. It seems that the response when  in  an error
state is reported by JM is not being rendered.

 We can see the response in the UI debugger but not on the UI
itself.

On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi > wrote:

It's not a stupid question at all! Try the following please:
1) Use something like Chrome's Developer Tools to check the
responses
you get from the web UI. If you see an error there, that
should point
you to what's going on.
2) Enable DEBUG logging for the JobManager and check the
logs (if 1
doesn't help)

We just merged a change into master that does a better job of
reporting any errors that occur during submission.

– Ufuk


On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
> wrote:
> We have had 2 different installations and when e go
submit a job, it gives
> us the spinning wheel and the request never reaches the
JM. CLI works
> without issues. Is this us alone or some one else seen this.
>
> Again 1.3.2 does not have the issue and we have the same
nodes, n/w etc so
> we sure nothing should have changed on our side.
>
> Apologies if this is a stupid question with an obvious
solution.
>
> Regards
>
> Vishal










Re: Failing to recover once checkpoint fails

2018-01-25 Thread Vishal Santoshi
Sorry.

There are 2 scenerios

  * Idempotent Sinks Use Case where we would want to restore from the
latest valid checkpoint.  If I understand the code correctly we try to
retrieve all completed checkpoints  for all handles in ZK and abort ( throw
an exception ) if there are handles but no corresponding complete
checkpoints in hdfs,  else we use the latest valid checkpoint state.  On
abort a restart  and thus restore of the  pipe  is issued repeating the
above execution. If the failure in hdfs was transient a retry will succeed
else when the  retry limit is reached the pipeline is aborted for good.


* Non Idempotent Sinks where we have no retries. We do not want to recover
from the last available checkpoint as the above code will do as the more
into history we go the more duplicates will be delivered. The only solution
is use exactly once semantics of the source and sinks if possible.








On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek 
wrote:

> Did you see my second mail?
>
>
> On 24. Jan 2018, at 12:50, Vishal Santoshi 
> wrote:
>
> As in, if there are chk handles in zk, there should no reason to start a
> new job ( bad handle, no hdfs connectivity etc ),
>  yes that sums it up.
>
> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek 
> wrote:
>
>> Wait a sec, I just checked out the code again and it seems we already do
>> that: https://github.com/apache/flink/blob/9071e3befb8c279f7
>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/
>> apache/flink/runtime/checkpoint/ZooKeeperCompletedC
>> heckpointStore.java#L210
>>
>> If there were some checkpoints but none could be read we fail recovery.
>>
>>
>> On 24. Jan 2018, at 11:32, Aljoscha Krettek  wrote:
>>
>> That sounds reasonable: We would keep the first fix, i.e. never delete
>> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
>> we fail the job if there are some checkpoints in ZooKeeper but none of them
>> can be restored to prevent the case where a job starts from scratch even
>> though it shouldn't.
>>
>> Does that sum it up?
>>
>> On 24. Jan 2018, at 01:19, Vishal Santoshi 
>> wrote:
>>
>> If we hit the retry limit, abort the job. In our case we will restart
>> from the last SP ( we as any production pile do it is n time s a day )  and
>> that I would think should be OK for most folks ?
>>
>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thank you for considering this. If I understand you correctly.
>>>
>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>> * Some issue restarted the pipeline.
>>> * The NN was down unfortunately and flink could not retrieve the  CHK
>>> state from the CHK pointer on ZK.
>>>
>>> Before
>>>
>>> * The CHK pointer was being removed and the job started from a brand new
>>> slate.
>>>
>>> After ( this fix on 1.4 +)
>>>
>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ).
>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit
>>> any retry limit ) to restore state
>>> * NN comes back
>>> * Flink restores state on the next retry.
>>>
>>> I would hope that is the sequence to follow.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi Vishal,

 I think you might be right. We fixed the problem that checkpoints where
 dropped via https://issues.apache.org/jira/browse/FLINK-7783. However,
 we still have the problem that if the DFS is not up at all then it will
 look as if the job is starting from scratch. However, the alternative is
 failing the job, in which case you will also never be able to restore from
 a checkpoint. What do you think?

 Best,
 Aljoscha


 On 23. Jan 2018, at 10:15, Fabian Hueske  wrote:

 Sorry for the late reply.

 I created FLINK-8487 [1] to track this problem

 @Vishal, can you have a look and check if if forgot some details? I
 logged the issue for Flink 1.3.2, is that correct?
 Please add more information if you think it is relevant.

 Thanks,
 Fabian

 [1] https://issues.apache.org/jira/browse/FLINK-8487

 2018-01-18 22:14 GMT+01:00 Vishal Santoshi :

> Or this one
>
> https://issues.apache.org/jira/browse/FLINK-4815
>
> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> ping.
>>
>> This happened again on production and it seems reasonable to
>> abort when a checkpoint is not found rather than behave as if it is a 
>> brand
>> new pipeline.
>>
>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Folks sorry for being 

Re: Does web ui hang for 1.4 for job submission ?

2018-01-25 Thread Lasse Nedergaard
Great news 

Does it also cover protection against loosing all your task managers?

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 25. jan. 2018 kl. 12.00 skrev Chesnay Schepler :
> 
> In 1.4, jobs submitted through the WebUI that never actually execute a job 
> will cause the UI to hang, with the only option
> of resolving that being to reload the page.
> 
> This will be fixed for 1.4.1 and 1.5 .
> 
>> On 24.01.2018 19:19, Vishal Santoshi wrote:
>> Exactly, the same context and issue. We too depend on args too and server 
>> side  validations and that on error ui hangs.
>> 
>>> On Wed, Jan 24, 2018 at 1:13 PM, Lasse Nedergaard 
>>>  wrote:
>>> Hi. 
>>> 
>>> Our jobs take a number of command line options and validate them during 
>>> startup and throw an exception if something is wrong. When this exception 
>>> happens ui hang all prev. uploaded jars are removed most task manager are 
>>> lost and all running jobs terminated. We running Flink 1.4 on dc/os
>>> 
>>> Med venlig hilsen / Best regards
>>> Lasse Nedergaard
>>> 
>>> 
>>> Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi 
>>> :
>>> 
 Yep, got it. It seems that the response when  in  an error state is 
 reported by JM is not being rendered.
 
  We can see the response in the UI debugger but not on the UI itself.
 
> On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:
> It's not a stupid question at all! Try the following please:
> 1) Use something like Chrome's Developer Tools to check the responses
> you get from the web UI. If you see an error there, that should point
> you to what's going on.
> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
> doesn't help)
> 
> We just merged a change into master that does a better job of
> reporting any errors that occur during submission.
> 
> – Ufuk
> 
> 
> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>  wrote:
> > We have had 2 different installations and when e go submit a job, it 
> > gives
> > us the spinning wheel and the request never reaches the JM. CLI works
> > without issues. Is this us alone or some one else seen this.
> >
> > Again 1.3.2 does not have the issue and we have the same nodes, n/w etc 
> > so
> > we sure nothing should have changed on our side.
> >
> > Apologies if this is a stupid question with an obvious solution.
> >
> > Regards
> >
> > Vishal
 
>> 
> 


Re: Does web ui hang for 1.4 for job submission ?

2018-01-25 Thread Chesnay Schepler
In 1.4, jobs submitted through the WebUI that never actually execute a 
job will cause the UI to hang, with the only option

of resolving that being to reload the page.

This will be fixed for 1.4.1 and 1.5 .

On 24.01.2018 19:19, Vishal Santoshi wrote:
Exactly, the same context and issue. We too depend on args too and 
server side  validations and that on error ui hangs.


On Wed, Jan 24, 2018 at 1:13 PM, Lasse Nedergaard 
> wrote:


Hi.

Our jobs take a number of command line options and validate them
during startup and throw an exception if something is wrong. When
this exception happens ui hang all prev. uploaded jars are removed
most task manager are lost and all running jobs terminated. We
running Flink 1.4 on dc/os

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi
>:


Yep, got it. It seems that the response when  in  an error state
is reported by JM is not being rendered.

 We can see the response in the UI debugger but not on the UI itself.

On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi > wrote:

It's not a stupid question at all! Try the following please:
1) Use something like Chrome's Developer Tools to check the
responses
you get from the web UI. If you see an error there, that
should point
you to what's going on.
2) Enable DEBUG logging for the JobManager and check the logs
(if 1
doesn't help)

We just merged a change into master that does a better job of
reporting any errors that occur during submission.

– Ufuk


On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
> wrote:
> We have had 2 different installations and when e go submit
a job, it gives
> us the spinning wheel and the request never reaches the JM.
CLI works
> without issues. Is this us alone or some one else seen this.
>
> Again 1.3.2 does not have the issue and we have the same
nodes, n/w etc so
> we sure nothing should have changed on our side.
>
> Apologies if this is a stupid question with an obvious
solution.
>
> Regards
>
> Vishal








Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

As you figured out, some dummy EOF record is one solution, however you might 
try to achieve it also by wrapping an existing CSV function. Your wrapper could 
emit this dummy EOF record. Another (probably better) idea is to use 
Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
ContrinousFileReaderOperator will do that for you, so you would just need to 
handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB 
writes, or just after reading the CSV file? If the latter one, you could add 
some custom ACK operator with parallelism one just after the CSV source that 
waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need 
to to wait until the EOF passes through all of your operators. You would need 
something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes 
-> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

> On 24 Jan 2018, at 23:19, Vinay Patil  wrote:
> 
> Hi Guys,
> 
> Following is how my pipeline looks (DataStream API) :
> 
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
> 
> [1] reads the data in sequence as it has single parallelism and then I have 
> default parallelism for the other operators.
> 
> I want to generate a response (ack) when all the data of the file is 
> processed. How can I achieve this ?
> 
> One solution I can think of is to have EOF dummy record in a file and a 
> unique field for all the records in that file. Doing a keyBy on this field 
> will make sure that all records are sent to a single slot. So, when EOF  
> dummy records is read I can generate a response/ack.
> 
> Is there a better way I can deal with this ?
> 
> 
> Regards,
> Vinay Patil



Re: Avoiding deadlock with iterations

2018-01-25 Thread Piotr Nowojski
Hi,

This is a known problem and I don’t think there is an easy solution to this. 
Please refer to the:
http://mail-archives.apache.org/mod_mbox/flink-user/201704.mbox/%3c5486a7fd-41c3-4131-5100-272825088...@gaborhermann.com%3E
 

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 


Thanks,
Piotrek

> On 25 Jan 2018, at 05:36, Ken Krugler  wrote:
> 
> Hi all,
> 
> We’ve run into deadlocks with two different streaming workflows that have 
> iterations.
> 
> In both cases, the issue is with fan-out; if any operation in the loop can 
> emit more records than consumed, eventually a network buffer fills up, and 
> then everyone in the iteration loop is blocked.
> 
> One pattern we can use, when the operator that’s causing the fan-out has the 
> ability to decide how much to emit, is to have it behave as an async 
> function, emitting from a queue with multiple threads. If threads start 
> blocking because of back pressure, then the queue begins to fill up, and the 
> function can throttle back how much data it queues up. So this gives us a 
> small (carefully managed) data reservoir we can use to avoid the deadlock.
> 
> Is there a better approach? I didn’t see any way to determine how “full” the 
> various network buffers are, and use that for throttling. Plus there’s the 
> issue of partitioning, where it would be impossible in many cases to know the 
> impact of a record being emitted. So even if we could monitor buffers, I 
> don’t think it’s a viable solution.
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler 
> +1 530-210-6378
> 



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
Hi,

As far as I know there is currently no simple way to do this:
Join stream with static data in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 

and
https://issues.apache.org/jira/browse/FLINK-6131 


One walk around might be to buffer on the state the Kafka input in your 
TwoInput operator until all of the broadcasted messages have arrived.
Another option might be to dynamically start your application. First run some 
computation to determine the fixed lists of ids and start the flink application 
with those values hardcoded in/passed via command line arguments.

Piotrek 

> On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
> 
> Hello,
> I have a scenario where I've two sources, one of them is source of fixed list 
> of ids for preloading (caching certain info which is slow) and second one is 
> the kafka consumer. I need to run Kafka after first one completes. I need a 
> mechanism to let the Kafka consumer know that it can start consuming 
> messages. How can I achieve this?
> thanks
> Ishwara Varnasi