Re: Multiple stream operator watermark handling

2018-05-25 Thread Piotr Nowojski
Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are 
working on to resolve in the future. Usually recommended workarounds are to 
send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
https://github.com/apache/flink/pull/6076 
<https://github.com/apache/flink/pull/6076>
Please check it and If you have any further suggestions you are welcome to make 
a comments in the PR. I hope it clarifies the behaviour.

Piotrek

> On 25 May 2018, at 00:03, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com 
> <mailto:fearsome.lucid...@gmail.com>> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> From top of my head I can imagine two solutions:
> 
> 1. Override the default behaviour of the operator via for example 
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> 
> That seems the safer, but more complicated path.
> 
> As we had already implemented the business logic in a RichCoFlatMapFunction, 
> I ended up extending CoStreamFlatMap:
> 
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: 
> CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
> 
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = 
> processWatermark(mark)
> 
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> 
> 
> Then it was easy to replace:
> 
> stream1
>   .connect(stream2)
>   .flatMap( new BusinessCoFlatMapFunction(params) )
> .name("Operator")
> .uid("op")
> 
> with:
> 
> stream1
>   .connect(stream2)
>   .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new 
> BusinessCoFlatMapFunction(params)))
>   .uid("op")
> 
> 



Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-12 Thread Piotr Nowojski
Hi,

> Can you elaborate that a little bit? are you referring to 
> "Output> output" in AbstractStreamOperator class?

Yes. However I have never tried it, so I’m not 100% sure there are no pit falls 
with that.

Regarding processing time timers. You should be able to register the timer once 
and then re-register in `onTimer(…)` callback using  `ctx.timerService()`.

Piotrek

> On 11 Jun 2018, at 18:59, Steven Wu  wrote:
> 
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector 
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector 
> out) throws Exception {
>// …
> }
> 
> correcting myself regarding the above timer proposal. it still requires a 
> message/record come in. I am trying to guard against when there is a long gap 
> of idle. then I won't be able to register a timer.
> 
> 
> On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu  <mailto:stevenz...@gmail.com>> wrote:
> Pirotr,
> 
> > However you could do it via a custom Operator (there you have a constant 
> > access to output collector). 
> 
> Can you elaborate that a little bit? are you referring to 
> "Output> output" in AbstractStreamOperator class?
> 
> > register processing time service in your ProcessFunction.
> 
> I think your timer proposal can work. 
> 
> I was originally register timer like this. ProcessingTimeCallback interface 
> doesn't supply the Collector parameter
> 
> ((StreamingRuntimeContext) getRuntimeContext())
> .getProcessingTimeService()
> .registerTimer(..., this);
> 
> Thanks, 
> Steven
> 
> 
> 
> On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Indeed it seems like this is not possible to emit records on 
> checkpoint/snapshot through ProcessFunction. However you could do it via a 
> custom Operator (there you have a constant access to output collector). 
> Another workaround might be to register processing time service in your 
> ProcessFunction.
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector 
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector 
> out) throws Exception {
>// …
> }
> 
> Piotrek
> 
>> On 11 Jun 2018, at 01:07, Steven Wu > <mailto:stevenz...@gmail.com>> wrote:
>> 
>> I have a process function defined with these interfaces
>> 
>> public class MyProcessFunction extends ProcessFunction 
>> implements CheckpointedFunction, ProcessingTimeCallback {...}
>> 
>> In snapshotState() method, I want to close files and emit the metadata about 
>> the closed files to downstream operator. it doesn't seem possible with 
>> snapshotState(FunctionSnapshotContext context) interface.
>> 
>> I can keep metadata in snapshot and restore them during recovery. but if 
>> there is no input record coming for a long time,  processElement(T value, 
>> Context ctx, Collector out) won't be called. Then I can't forward 
>> the restored data to downstream operator with guaranteed latency.
>> 
>> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) 
>> allows me to forward output to downstream operator either.
>> 
>> Thanks,
>> Steven
> 
> 
> 



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
function and you can not migrate your state that way.

As far as I know yes, at the moment in order to convert everything at once 
(without getKeyes you still can implement lazy conversion) you would have to 
write your own operator.

Piotrek

> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> I used `ProcessFunction` to implement it, but it seems that I can't call 
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in 
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous 
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
> migration state like the manner showed in `WindowOperator`? 
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski  <mailto:pi...@data-artisans.com>>:
> What function are you implementing and how are you using it?
> 
> Usually it’s enough if your function implements RichFunction (or rather 
> extend from AbstractRichFunction) and then you could use RichFunction#open in 
> the similar manner as in the code that I posted in previous message. Flink in 
> many places performs instanceof chekcs like: 
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
> 
> public static void openFunction(Function function, Configuration parameters) 
> throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 11:07, Tony Wei > <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> It seems that this was implemented by `Operator` API, which is a more low 
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event triggered, 
>> it is more convenient in this way to migrate state by foreach all keys in 
>> `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it possible 
>> to port it to `KeyedProcessOperator` and do the state migration that you 
>> mentioned?
>> And are there something concerned and difficulties that will leads to 
>> restored state failed or other problems? Thank you!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski > <mailto:pi...@data-artisans.com>>:
>> Hi,
>> 
>> General solution for state/schema migration is under development and it 
>> might be released with Flink 1.6.0.
>> 
>> Before that, you need to manually handle the state migration in your 
>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>> with previous version. What you can do, is to add a logic in open method, to 
>> check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>> migrate “stateV1” to “stateV2”
>> 
>> In your OperatorV3 you could drop the support for “stateV1”.
>> 
>> I have once implemented something like that here:
>> 
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>  
>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>> 
>> Hope that helps!
>> 
>> Piotrek
>> 
>> 
>>> On 6 Jun 2018, at 17:04, TechnoMage >> <mailto:mla...@technomage.com>> wrote:
>>> 
>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>> 
>>> When a job is modified and we want to deploy the new version, what is the 
>>> preferred method?  Our jobs have a lot of keyed state.
>>> 
>>> If we use snapshots we have old state that may no longer apply to the new 
>>> pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but that 
>>> can be very resource heavy for a while.
>>> 
>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>> “purge” selectively the keyed state?
>>> 
>>> Michael
>> 
>> 
> 
> 



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
What function are you implementing and how are you using it?

Usually it’s enough if your function implements RichFunction (or rather extend 
from AbstractRichFunction) and then you could use RichFunction#open in the 
similar manner as in the code that I posted in previous message. Flink in many 
places performs instanceof chekcs like: 
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

public static void openFunction(Function function, Configuration parameters) 
throws Exception{
   if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
   }
}

Piotrek

> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> It seems that this was implemented by `Operator` API, which is a more low 
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event triggered, 
> it is more convenient in this way to migrate state by foreach all keys in 
> `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it possible 
> to port it to `KeyedProcessOperator` and do the state migration that you 
> mentioned?
> And are there something concerned and difficulties that will leads to 
> restored state failed or other problems? Thank you!
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski  <mailto:pi...@data-artisans.com>>:
> Hi,
> 
> General solution for state/schema migration is under development and it might 
> be released with Flink 1.6.0.
> 
> Before that, you need to manually handle the state migration in your 
> operator’s open method. Lets assume that your OperatorV1 has a state field 
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
> with previous version. What you can do, is to add a logic in open method, to 
> check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually 
> migrate “stateV1” to “stateV2”
> 
> In your OperatorV3 you could drop the support for “stateV1”.
> 
> I have once implemented something like that here:
> 
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>  
> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
> 
> Hope that helps!
> 
> Piotrek
> 
> 
>> On 6 Jun 2018, at 17:04, TechnoMage > <mailto:mla...@technomage.com>> wrote:
>> 
>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>> 
>> When a job is modified and we want to deploy the new version, what is the 
>> preferred method?  Our jobs have a lot of keyed state.
>> 
>> If we use snapshots we have old state that may no longer apply to the new 
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but that 
>> can be very resource heavy for a while.
>> 
>> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
>> selectively the keyed state?
>> 
>> Michael
> 
> 



Re: Conceptual question

2018-06-08 Thread Piotr Nowojski
Hi,

Yes it should be feasible. As I said before, with Flink 1.6 there will be 
better way for migrating a state, but for now you either need to lazily convert 
the state, or iterate over the keys and do the job manually.

Piotrek

> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> So my question is: is that feasible to migrate state from `ProcessFunction` 
> to my own operator then use `getKeyedStateBackend()` to migrate the states?
> If yes, is there anything I need to be careful with? If no, why and can it be 
> available in the future? Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski  <mailto:pi...@data-artisans.com>>:
> Hi,
> 
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
> function and you can not migrate your state that way.
> 
> As far as I know yes, at the moment in order to convert everything at once 
> (without getKeyes you still can implement lazy conversion) you would have to 
> write your own operator.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:26, Tony Wei > <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>> `getKeyedStateBackend()` like `WindowOperator` did.
>> I found that `getKeyedStateBackend()` is the method in 
>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>> Dose that mean I can't look up all keys and migrate the entire previous 
>> states to the new states in `ProcessFunction#open()`?
>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>> migration state like the manner showed in `WindowOperator`? 
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski > <mailto:pi...@data-artisans.com>>:
>> What function are you implementing and how are you using it?
>> 
>> Usually it’s enough if your function implements RichFunction (or rather 
>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>> in the similar manner as in the code that I posted in previous message. 
>> Flink in many places performs instanceof chekcs like: 
>> org.apache.flink.api.com 
>> <http://org.apache.flink.api.com/>mon.functions.util.FunctionUtils#openFunction
>> 
>> public static void openFunction(Function function, Configuration parameters) 
>> throws Exception{
>>if (function instanceof RichFunction) {
>>   RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 11:07, Tony Wei >> <mailto:tony19920...@gmail.com>> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> It seems that this was implemented by `Operator` API, which is a more low 
>>> level api compared to `Function` API.
>>> Since in `Function` API level we can only migrate state by event triggered, 
>>> it is more convenient in this way to migrate state by foreach all keys in 
>>> `open()` method.
>>> If I was implemented state operator by `ProcessFunction` API, is it 
>>> possible to port it to `KeyedProcessOperator` and do the state migration 
>>> that you mentioned?
>>> And are there something concerned and difficulties that will leads to 
>>> restored state failed or other problems? Thank you!
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski >> <mailto:pi...@data-artisans.com>>:
>>> Hi,
>>> 
>>> General solution for state/schema migration is under development and it 
>>> might be released with Flink 1.6.0.
>>> 
>>> Before that, you need to manually handle the state migration in your 
>>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>>> with previous version. What you can do, is to add a logic in open method, 
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>>> migrate “stateV1” to “stateV2”
>>> 
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>> 
>>> I have once implemented something like that here:
>>> 
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/stream

Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Piotr Nowojski
Hi,

BucketingSink is designed to provide exactly-once writes to file system, which 
is inherently tied to checkpointing. As you just saw, without checkpointing, 
BucketingSink is never notified that it can commit pending files. 

If you do not want to use checkpointing for some reasons, you could always use 
for example 
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat and 
write your own simple `OutputFormat` or look if one of the existing ones meet 
your needs.

Piotrek

> On 7 Jun 2018, at 14:23, Rinat  wrote:
> 
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
> For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
> 
> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
> found a strange thing in the implementation of BucketingSink.
> 
> During the sink lifecycle, we have a state object, implemented as a map, 
> where key is a bucket path, and value is a state, that contains information 
> about opened files and list of pending files.
> After researching of the heap dump, I found, that those state stores 
> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
> 120 Mb.
> 
> I’ve looked through the code, and found, that we removing the buckets from 
> the state, in notifyCheckpointComplete method. 
> 
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>   Iterator>> bucketStatesIt = 
> state.bucketStates.entrySet().iterator();
>   while (bucketStatesIt.hasNext()) {
>if (!bucketState.isWriterOpen &&
>bucketState.pendingFiles.isEmpty() &&
>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
> 
>// We've dealt with all the pending files and the writer for this 
> bucket is not currently open.
>// Therefore this bucket is currently inactive and we can remove it 
> from our state.
>bucketStatesIt.remove();
> }
> }
> }
> 
> So, this looks like an issue, when you are using this sink in checkpointless 
> environment, because the data always added to the state, but never removed.
> Of course, we could enabled checkpointing, and use one of available backends, 
> but as for me, it seems like a non expected behaviour, like I have an 
> opportunity to run the job without checkpointing, but really, if I do so,
> I got an exception in sink component.
> 
> What do you think about this ? Do anyone got the same problem, and how’ve you 
> solved it ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Re: [BucketingSink] notify on moving into pending/ final state

2018-06-14 Thread Piotr Nowojski
Hi,

Couple of things:

1. Please create a Jira ticket with this proposal, so we can move discussion 
from user mailing list.

I haven’t thought it through, so take my comments with a grain of salt, however:

2. If we were to go with such callback, I would prefer to have one 
BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
`onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
interface passed three times/four times for different purposes.

3. Other thing that I had in mind is that BucketingSink could be rewritten to 
extend TwoPhaseCommitSinkFunction. In that case, with 

public class BucketingSink2 extends TwoPhaseCommitSinkFunction

user could add his own hooks by overriding following methods

BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
BucketingSink2#commit, BucketingSink2#abort. For example:

public class MyBucketingSink extends BucketingSink2 {
  @Override
  protected void  commit(??? txn) {
super.commit(txn);
// My hook on moving file from pending to commit state
  };
}

Alternatively, we could implement before mentioned callbacks support in 
TwoPhaseCommitSinkFunction and provide such feature to 
Kafka/Pravega/BucketingSink at once.

Piotrek

> On 13 Jun 2018, at 22:45, Rinat  wrote:
> 
> Hi guys, thx for your reply.
> 
> The following code info is actual for release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class
> 
> For now, BucketingSink has the following lifecycle of files
> 
> When moving files from opened to pending state:
> on each item (method invoke:434 line), we check that suitable bucket exist, 
> and contain opened file, in case, when opened file doesn’t exist, we create 
> one, and write item to it
> on each item (method invoke:434 line), we check that suitable opened file 
> doesn’t exceed the limits, and if limits are exceeded, we close it and move 
> into pending state using closeCurrentPartFile:568 line - private method
> on each timer request (onProcessingTime:482 line), we check, if items haven't 
> been added to the opened file longer, than specified period of time, we close 
> it, using the same private method closeCurrentPartFile:588 line
> 
> So, the only way, that we have, is to call our hook from 
> closeCurrentPartFile, that is private, so we copy-pasted the current impl and 
> injected our logic there
> 
> 
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in notifyCheckpointComplete:657 line, that is public, and contains 
> a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
> 
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
> 
> To solve such problem, we've created the following interface
> 
> /**
>  * The {@code FileStateChangeCallback} is used to perform any additional 
> operations, when {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of {@code BucketingSink}, look
>  * through it's official documentation.
>  */
> public interface FileStateChangeCallback extends Serializable {
> 
> /**
>  * Used to perform any additional operations, related with moving of file 
> into next state.
>  *
>  * @param fs provides access for working with file system
>  * @param path path to the file, moved into next state
>  *
>  * @throws IOException if something went wrong, while performing any 
> operations with file system
>  */
> void call(FileSystem fs, Path path) throws IOException;
> }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
> 
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...}
> public BucketingSink 
> registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
> {...}
> 
> I’m ready to discuss the best ways, how such hooks could be implemented in 
> the core impl or any other improvements, that will help us to add such 
> functionality into our extension, using public api, instead of copy-pasting 
> the source code.
> 
> Thx for your help, mates =)
> 
> 
>> On 11 Jun 2018, at 11:37, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> I see that could be a useful feature. What exactly now is preventing you 
>> from inheriting from BucketingSink? Maybe it would be just enough to make 
>> the BucketingSink easier extendable.
>> 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?

Please also refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
 

 , especially this part:

> Note that the Flink Kafka Consumer does not rely on the committed offsets for 
> fault tolerance guarantees. The committed offsets are only a means to expose 
> the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you 
say/estimate when did the committing brake/stop? Did you check Kafka logs for 
any errors?

To me it seems more like a Kafka issue/bug:
https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
 

https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
 

Especially that in your case this offsets committing is superseded by Kafka 
coordinator failure.

Piotrek

> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
> 
> Hi,
> 
> We have a Flink stream job that uses Flink kafka consumer. Normally it 
> commits consumer offsets to Kafka.
> 
> However this stream ended up in a state where it's otherwise working just 
> fine, but it isn't committing offsets to Kafka any more. The job keeps 
> writing correct aggregation results to the sink, though. At the time of 
> writing this, the job has been running 14 hours without committing offsets.
> 
> Below is an extract from taskmanager.log. As you can see, it didn't log 
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are 
> the last lines so far.
> 
> Could you help check if this is a know bug, possibly already fixed, or 
> something new?
> 
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
> 
> Cheers,
> Juho
> 
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, 
> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, 
> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, 
> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, 
> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, 
> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for 
> group aggregate-all_server_measurements_combined-20180606-1000: Offset commit 
> failed with a retriable exception. You should retry committing offsets.
> 2018-06-07 22:08:29,840 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:29,841 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets 

Re: [BucketingSink] notify on moving into pending/ final state

2018-06-11 Thread Piotr Nowojski
Hi,

I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.

One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 

Piotrek

> On 8 Jun 2018, at 16:38, Rinat  wrote:
> 
> Hi mates, I got a proposal about functionality of BucketingSink.
> 
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
> 
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
> 
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Re: Heap Problem with Checkpoints

2018-06-11 Thread Piotr Nowojski
Hi,

What kind of messages are those “logs about S3 operations”? Did you try to 
google search them? Maybe it’s a known S3 issue?

Another approach is please use some heap space analyser from which you can 
backtrack classes that are referencing those “memory leaks” and again try to 
google any known memory issues.

It also could just mean, that it’s not a memory leak, but you just need to 
allocate more heap space for your JVM (and memory consumption will stabilise at 
some point).

Piotrek

> On 8 Jun 2018, at 18:32, Fabian Wollert  wrote:
> 
> Hi, in this email thread 
> 
>  here, i tried to set up S3 as a filesystem backend for checkpoints. Now 
> everything is working (Flink V1.5.0), but the JobMaster is accumulating Heap 
> space, with eventually killing itself with HeapSpace OOM after several hours. 
> If I don't enable Checkpointing, then everything is fine. I'm using the Flink 
> S3 Shaded Libs (tried both the Hadoop and the Presto lib, no difference in 
> this regard) from the tutorial. my checkpoint settings are this (job level):
> 
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> Another clue why i suspect the S3 Checkpointing is that the heapspace dump 
> contains a lot of char[] objects with some logs about S3 operations.
> 
> anyone has an idea where to look further on this?
> 
> Cheers
> 
> --
> 
> Fabian Wollert
> Zalando SE
> 
> E-Mail: fabian.woll...@zalando.de
>  
> 
> Tamara-Danz-Straße 1
> 10243 Berlin
> Fax: +49 (0)30 2759 46 93
> E-mail: legalnot...@zalando.co.uk 
> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30 2000889349
> 
> Management Board:
> Robert Gentz, David Schneider, Rubin Ritter
> 
> Chairman of the Supervisory Board:
> Lothar Lanz
> 
> Person responsible for providing the contents of Zalando SE acc. to Art. 55 
> RStV [Interstate Broadcasting Agreement]: Rubin Ritter
> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
> VAT registration number: DE 260543043



Re: Take elements from window

2018-06-11 Thread Piotr Nowojski
Hi,

Do I understand you correctly, that you just want to have a three different 
sliding windows (for 3 rules) with duration of 10, 20 and 30 minutes? If so, I 
haven’t tested it but I would guess that there are at least two solutions for 
the problem:

1. just create three different sliding windows on top of the same stream 
(possibly later join/union stream)
2. create sliding window (60 minutes, 10 minutes) and provide custom 
ReduceFunction/ProcessFunction that splits the records internally into separate 
aggregations windows. Your reduce function would have a 6 aggregates for 10 
minutes intervals for rule1, 3 aggregates for 20 minute intervals and 2 
aggregates for 30 minute intervals.

Piotrek

> On 8 Jun 2018, at 21:10, Antonio Saldivar Lezama  wrote:
> 
> Hello
> 
> 
> I am wondering if it is possible to process the following scenario, to store 
> all events by event time in a general window and process elements from a 
> smaller time Frame
> 
> 1.-  Store elements in a General SlidingWindow (60 mins, 10 mins)
>   - Rule 1 -> gets 10 mins elements from the general window and get 
> aggregations
>   - Rule 2 -> gets 20 mins elements from the general window and get 
> aggregations
>   - Rule 3 -> gets 30 mins elements from the general window and get 
> aggregations
> 2.- send results 
> 
> Thank you
> Regards



Re: Kafka to Flink to Hive - Writes failing

2018-06-11 Thread Piotr Nowojski
Yes, BucketingSink is a better option. You can start from looking at the 
BucketingSink java docs.

Please also take a look on this: 

https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink
 


Alternatively if you do not need to push a lot of data, you could write your 
own JDBC sink that bases on the JDBCAppendTableSink and adjusting it so that it 
works with hive’s JDBC client.

Piotrek

> On 11 Jun 2018, at 08:12, sagar loke  wrote:
> 
> Thanks, 
> We are getting data in Avro format from Kafka and are planning to write data 
> in ORC format to Hive tables. 
> 
> 1. Is BucketingSink better option for this use case or something else ?
> 2. Is there a sample code example which we can refer ?
> 
> Thanks in advance,
> 
> On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke  > wrote:
> Don’t use the JDBC driver to write to Hive. The performance of JDBC in 
> general for large volumes is suboptimal.
> Write it to a file in HDFS in a format supported by HIve and point the table 
> definition in Hive to it.
> 
> On 11. Jun 2018, at 04:47, sagar loke  > wrote:
> 
>> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using 
>> following code snippet:
>> 
>> But I am getting following error:
>> 
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream = readFromKafka(env);
>> 
>> 
>> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
>> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
>> };
>> 
>>  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>> .setDrivername("org.apache.hive.jdbc.HiveDriver")
>> .setDBUrl("jdbc:hive2://hiveconnstring <>")
>> .setUsername("myuser")
>> .setPassword("mypass")
>> .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES 
>> (?,?)")
>> .setBatchSize(1000)
>> .setParameterTypes(FIELD_TYPES)
>> .build();
>> 
>> DataStream rows = stream.map((MapFunction) st1 
>> -> {
>> Row row = new Row(2); // 
>> row.setField(0, st1.get("SOME_ID")); 
>> row.setField(1, st1.get("SOME_ADDRESS"));
>> return row;
>> });
>> 
>> sink.emitDataStream(rows);
>> env.execute("Flink101");
>> 
>> 
>> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
>> ... 12 more
>> 
>> Caused by: java.sql.SQLException: Method not supported
>> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>> ... 17 more
>> I checked hive-jdbc driver and it seems that the Method is not supported in 
>> hive-jdbc driver.
>> 
>> public class HiveStatement implements java.sql.Statement {
>> ...
>> 
>>   @Override  
>>   public int[] executeBatch() throws SQLException {
>> throw new SQLFeatureNotSupportedException("Method not supported");
>>   }
>> 
>> ..
>> }
>> Is there any way we can achieve this using JDBC Driver ?
>> 
>> Let me know,
>> 
>> Thanks in advance.
>> 
> 
> 
> 
> -- 
> Regards,
> SAGAR.



Re: Akka version conflict running on Flink cluster

2018-06-11 Thread Piotr Nowojski
Hi,

Please take a look on this thread first:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Akka-Http-used-in-custom-RichSourceFunction-td20314.html
 


Piotrek

> On 11 Jun 2018, at 11:16, Wouter Zorgdrager  wrote:
> 
> Hi,
> 
> I think I'm running into an Akka version conflict when running a Flink job on 
> a cluster.
> 
> The current situation:
> - Flink cluster on Flink 1.4.2 (using Docker)
> - Flink job which uses twitter4s [1] library and Akka version 2.5.8
> 
> In my Flink job I try to 'shutdown' an Akka actor from the twitter4s library.
> This results in a whole taskmanager crashing with the following stacktrace:
> 
> taskrunner_1  | 2018-06-11 09:03:14,454 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source -> Sink: Unnamed 
> (0ba7f7f259eee06fe2f7d783c868179b)
> taskrunner_1  | Uncaught error from thread 
> [twitter4s-streaming-akka.actor.default-dispatcher-288]: loader constraint 
> violation: when resolving method 
> "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class 
> loader (instance of 
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
>  of the current class, akka/actor/ActorCell, and the class loader (instance 
> of sun/misc/Launcher$AppClassLoader) for the method's defining class, 
> akka/actor/ActorCell$$anonfun$3, have different Class objects for the type 
> akka/actor/ActorCell used in the signature, shutting down JVM since 
> 'akka.jvm-exit-on-fatal-error' is enabled for for 
> ActorSystem[twitter4s-streaming]
> taskrunner_1  | java.lang.LinkageError: loader constraint violation: when 
> resolving method 
> "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class 
> loader (instance of 
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
>  of the current class, akka/actor/ActorCell, and the class loader (instance 
> of sun/misc/Launcher$AppClassLoader) for the method's defining class, 
> akka/actor/ActorCell$$anonfun$3, have different Class objects for the type 
> akka/actor/ActorCell used in the signature
> taskrunner_1  | at akka.actor.ActorCell.invoke(ActorCell.scala:499)
> taskrunner_1  | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> taskrunner_1  | at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> taskrunner_1  | at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> taskrunner_1  | 2018-06-11 09:03:14,984 INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down 
> BLOB cache
> taskrunner_1  | 2018-06-11 09:03:14,985 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1" 
> java.lang.NoClassDefFoundError: 
> akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:515)
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown$$anonfun$initJvmHook$1.apply(CoordinatedShutdown.scala:217)
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown$$anon$2.run(CoordinatedShutdown.scala:547)
> taskrunner_1  | Caused by: java.lang.ClassNotFoundException: 
> akka.actor.CoordinatedShutdown$$anonfun$totalTimeout$1
> taskrunner_1  | at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> taskrunner_1  | at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> taskrunner_1  | ... 3 more
> 
> To me, it looks like an version conflict. Any suggestions how to solve this?
> 
> Thanks!
> Wouter
> 
> [1] - Twitter4s: 
> https://github.com/DanielaSfregola/twitter4s/blob/master/build.sbt 
> 
> 



Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
The more I look into it, the more it seems like a Kafka bug or some cluster 
failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case 
KafkaConsumer should commit offsets once every so often when it’s polling 
messages. Unless for example `cordinatorUnknown()` returns false in 
`org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
 (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}

Have you checked Kafka logs? This suggests that the real problem is hidden 
behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
> Marking the coordinator 
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 
> rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
> 
> Hi Piotr, thanks for your insights.
> 
> > What’s your KafkaConsumer configuration?
> 
> We only set these in the properties that are passed to FlinkKafkaConsumer010 
> constructor:
> 
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id <http://group.id/>=my_group
> flink.partition-discovery.interval-millis=3
> 
> > is checkpointing enabled?
> 
> No.
> 
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> > auto.commit.interval.ms <http://auto.commit.interval.ms/>
> 
> We have whatever is the default behaviour of Flink kafka consumer. It seems 
> to commit quite often, something like every 5 seconds.
> 
> > did you set setCommitOffsetsOnCheckpoints() ?
> 
> No. But I checked with debugger that apparently 
> enableCommitOnCheckpoints=true is the default.
> 
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
> 
> So I guess you're right that this bug doesn't seem to be in Flink itself? I 
> wonder if it's a known issue in Kafka client lib..
> 
> I also took thread dump on one of the task managers in this broken state. But 
> I couldn't spot anything obvious when comparing the threads to a dump from a 
> job where offsets are being committed. Any way I've saved the thread dump in 
> case there's something to look for specifically.
> 
> Sharing the full logs of job & task managers would be a bit of a hassle, 
> because I don't have an automatic way to obfuscate the logs so that I'm sure 
> that there isn't anything sensitive left. Any way, there isn't anything else 
> to share really. I wrote: "As you can see, it didn't log anything until 
> ~2018-06-07 22:08. Also that's where the log ends".
> 
> Thanks once more.
> 
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> auto.commit.interval.ms <http://auto.commit.interval.ms/>
> - did you set setCommitOffsetsOnCheckpoints() ?
> 
> Please also refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration>
>  , especially this part:
> 
> > Note that the Flink Kafka Consumer does not rely on the committed offsets 
> > for fault tolerance guarantees. The committed offsets are only a means to 
> > expose the consumer’s progress for monitoring purposes.
> 
> Can you post full logs from all TaskManagers/JobManager and can you 
> say/estimate when did the committing brake/stop? Did you check Kafka logs for 
> any errors?
> 
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>  
> <https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188>
> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>  
> <https://stackov

Re: Odd job failure

2018-05-29 Thread Piotr Nowojski
Hi,

Could you post full output of the mvn dependency:tree command on your project?
Can you reproduce this issue with some minimal project stripped down of any 
custom code/external dependencies except of Flink itself?

Thanks Piotrek

> On 28 May 2018, at 20:13, Elias Levy  wrote:
> 
> On Mon, May 28, 2018 at 1:48 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Most likely suspect is the standard java problem of some dependency 
> convergence issue. Please check if you are not pulling in multiple Kafka 
> versions into your class path. Especially your job shouldn’t pull any Kafka 
> library except of the one that comes from flnk-connector-kafka-0.11 (which is 
> 0.11.0.2).
> 
> Alas, that is not the case.  The job correctly includes 
> kafka-clients:0.11.0.2 <http://0.11.0.2/>:
> 
> [warn] Found version conflict(s) in library dependencies; some are suspected 
> to be binary incompatible:
> [warn] 
> [warn]* org.apache.kafka:kafka-clients:0.11.0.2 is selected over 
> {0.10.2.1, 0.9.0.1}
> [warn]+- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2 
> (depends on 0.11.0.2)
> [warn]+- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn]+- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn] 
> 
>  
> Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
> 0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
> depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
> incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
> wouldn’t hurt to upgrade the cluster.
> 
> Thanks for the tip.  That said, this error should be unrelated to the version 
> of the cluster.
> 



Re: Question about JVM exit caused by timeout exception with the asynchronous IO of flink 1.4.2

2018-06-07 Thread Piotr Nowojski
Hi,

You can increase a timeout, that’s one way to tackle it. 

In Flink 1.6.0 there will be possibility to override default Flink’s behaviour 
regarding handling timeouts:
https://issues.apache.org/jira/browse/FLINK-7789 
 to handle them, instead of 
out right failing.

Also if you can not wait for the new release, you always could copy 
AsyncWaitOperator with AsyncFunction into your code base and apply the changes 
from the above mentioned ticket 
(https://github.com/apache/flink/pull/6091/files 
)

Piotrek

> On 6 Jun 2018, at 10:39, 陈卓  wrote:
> 
> HI
> The asynchronous IO of flink 1.4.2 will throw timeout exception when the 
> timeout setting is one second and the invoke time setting is greater than 
> twenty seconds. Unfortunately the timeout exception cannot be captured, which 
> leads to abnormal exit of the process. So my question is how to deal with 
> this situation to keep the jvm executing.
>  
> the exception info:
> 
>  
> code as follows
> 
>  
> 
> 
>  
>  
> 
>  
>  
> -- 
> Thanks
> zhuo chen



Re: Extending stream events with a an aggregate value

2018-06-07 Thread Piotr Nowojski
Hi,

Ńo worries :) You probably need to write your own process function to do 
exactly that, maybe something like this:

DataStream> test;

DataStream> max = test.keyBy(0)
  .process(new KeyedProcessFunction, 
Tuple3>() {
 public ValueState max;

 @Override
 public void open(Configuration parameters) throws Exception {
ValueStateDescriptor descriptor =
  new ValueStateDescriptor<>("max", TypeInformation.of(new 
TypeHint() {
  }));
sum = getRuntimeContext().getState(descriptor);
 }

 @Override
 public void processElement(Tuple2 value, Context ctx, 
Collector> out) throws Exception {
// ...
 }
  });

You need to store max on the state if you care about recovering from 
failures/restarts without loosing previous max value. Please check the online 
documentation for ProcessFunction and handling state in Flink :)

Piotrek

> On 6 Jun 2018, at 15:55, Nicholas Walton  wrote:
> 
> I’m sure I’m being a complete idiot, since this seems so trivial but if 
> someone could point me in the right direction I’d be very grateful.
> 
> I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate 
> the running max of the stream no problem using “.max(2)”. But I want to 
> output the original input together with the running max value as [(Int, 
> Double, Double)]. I’ve hunted high and low for a means to do something so 
> trivial.
> 
> Nick Walton



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

General solution for state/schema migration is under development and it might 
be released with Flink 1.6.0.

Before that, you need to manually handle the state migration in your operator’s 
open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your 
OperatorV2 defines field “stateV2”, which is incompatible with previous 
version. What you can do, is to add a logic in open method, to check:
1. If “stateV2” is non empty, do nothing
2. If there is no “stateV2”, iterate over all of the keys and manually migrate 
“stateV1” to “stateV2”

In your OperatorV3 you could drop the support for “stateV1”.

I have once implemented something like that here:

https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
 


Hope that helps!

Piotrek

> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
> 
> We are still pretty new to Flink and I have a conceptual / DevOps question.
> 
> When a job is modified and we want to deploy the new version, what is the 
> preferred method?  Our jobs have a lot of keyed state.
> 
> If we use snapshots we have old state that may no longer apply to the new 
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but that 
> can be very resource heavy for a while.
> 
> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
> selectively the keyed state?
> 
> Michael



Re: Odd job failure

2018-05-28 Thread Piotr Nowojski
Hi,

I think that’s unlikely to happen. As far as I know, the only way to actually 
unload the classes in JVM is when their class loader is garbage collected, 
which means all the references in the code to it must vanish. In other words, 
it should never happen that class is not found while anyone is still 
referencing it.

Most likely suspect is the standard java problem of some dependency convergence 
issue. Please check if you are not pulling in multiple Kafka versions into your 
class path. Especially your job shouldn’t pull any Kafka library except of the 
one that comes from flnk-connector-kafka-0.11 (which is 0.11.0.2).

Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
wouldn’t hurt to upgrade the cluster.

Piotrek

> On 26 May 2018, at 17:58, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> Piotr & Stephan,
> 
> Thanks for the replies.  Apologies for the late response.  I've been 
> traveling for the past month.
> 
> We've not observed this issue (spilling) again, but it is good to know that 
> 1.5 will use back-pressure based alignment.  I think for now we'll leave 
> task.checkpoint.alignment.max-size as is and work towards moving to 1.5 once 
> we confirm it is stable.
> 
> As for the java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 error.  We see that one constantly 
> when jobs are canceled/restarted/upgraded.  We are using the 
> flink-connector-kafka-0.11 connector against a 0.11.0.0 cluster.  The error 
> indicates to me that the Kafka threads are not being fully shutdown and they 
> are trying to reload the NetworkClient class but failing, maybe because the 
> code is no longer accessible via the class loader or some other reason.  
> 
> It looks like others are observing the same error.  Alexander Smirnov 
> reported it here on the list last month as well.
> 
> 
> On Thu, May 3, 2018 at 1:22 AM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> Hi Elias!
> 
> Concerning the spilling of alignment data to disk:
> 
>   - In 1.4.x , you can set an upper limit via " 
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does not 
> spill any more.
> 
> Best,
> Stephan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size>
> 
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> It might be some Kafka issue. 
> 
> From what you described your reasoning seems sound. For some reason TM3 fails 
> and is unable to restart and process any data, thus forcing spilling on 
> checkpoint barriers on TM1 and TM2.
> 
> I don’t know the reason behind java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
> important in this case.
> 
> 1. What Kafka version are you using? Have you looked for any known Kafka 
> issues with those symptoms?
> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
> image? It might be some system issue.
> 
> Piotrek
> 
>> On 28 Apr 2018, at 01:54, Elias Levy <fearsome.lucid...@gmail.com 
>> <mailto:fearsome.lucid...@gmail.com>> wrote:
>> 
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
>> failure the other day.  It seems that it started as some sort of network 
>> event.  
>> 
>> It began with the 3rd TM starting to warn every 30 seconds about socket 
>> timeouts while sending metrics to DataDog.  This latest for the whole outage.
>> 
>> Twelve minutes later, all TMs reported at nearly the same time that they had 
>> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
>> system attempted to recover it.  Then things got into a weird state.
>> 
>> The following related for six or seven times for a period of about 40 
>> minutes: 
>> TM attempts to restart the job, but only the first and second TMs show signs 
>> of doing so.  
>> The disk begins to fill up on TMs 1 and 2.  
>> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
>> org/apache/kafka/clients/Netw

Re: Odd job failure

2018-05-02 Thread Piotr Nowojski
Hi,

It might be some Kafka issue. 

From what you described your reasoning seems sound. For some reason TM3 fails 
and is unable to restart and process any data, thus forcing spilling on 
checkpoint barriers on TM1 and TM2.

I don’t know the reason behind java.lang.NoClassDefFoundError: 
org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
important in this case.

1. What Kafka version are you using? Have you looked for any known Kafka issues 
with those symptoms?
2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
image? It might be some system issue.

Piotrek

> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
> 
> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
> failure the other day.  It seems that it started as some sort of network 
> event.  
> 
> It began with the 3rd TM starting to warn every 30 seconds about socket 
> timeouts while sending metrics to DataDog.  This latest for the whole outage.
> 
> Twelve minutes later, all TMs reported at nearly the same time that they had 
> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
> system attempted to recover it.  Then things got into a weird state.
> 
> The following related for six or seven times for a period of about 40 
> minutes: 
> TM attempts to restart the job, but only the first and second TMs show signs 
> of doing so.  
> The disk begins to fill up on TMs 1 and 2.  
> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on 
> this list earlier this month.  It is unclear if the are benign.
> The job dies when the disks finally fills up on 1 and 2.
> 
> Looking at the backtrace logged when the disk fills up, I gather that Flink 
> is buffering data coming from Kafka into one of my operators as a result of a 
> barrier.  The job has a two input operator, with one input the primary data, 
> and a secondary input for control commands.  It would appear that for 
> whatever reason the barrier for the control stream is not making it to the 
> operator, thus leading to the buffering and full disks.  Maybe Flink 
> scheduled the operator source of the control stream on the 3rd TM which seems 
> like it was not scheduling tasks?
> 
> Finally the JM records that it 13 late messages for already expired 
> checkpoints (could they be from the 3rd TM?), the job is restored one more 
> time and it works.  All TMs report nearly at the same time that they can now 
> find the Kafka coordinator.
> 
> 
> Seems like the 3rd TM has some connectivity issue, but then all TMs seems to 
> have a problem communicating with the Kafka coordinator at the same time and 
> recovered at the same time.  The TMs are hosted in AWS across AZs, so all of 
> them having connectivity issues at the same time is suspect.  The Kafka node 
> in question was up and other clients in our infrastructure seems to be able 
> to communicate with it without trouble.  Also, the Flink job itself seemed to 
> be talking to the Kafka cluster while restarting as it was spilling data to 
> disk coming from Kafka.  And the JM did not report any reduction on available 
> task slots, which would indicate connectivity issues between the JM and the 
> 3rd TM.  Yet, the logs in the 3rd TM do not show any record of trying to 
> restore the job during the intermediate attempts.
> 
> What do folks make of it?
> 
> 
> And a question for Flink devs, is there some reason why Flink does not stop 
> spilling messages to disk when the disk is going to fill up?  Seems like 
> there should be a configurable limit to how much data can be spilled before 
> back-pressure is applied to slow down or stop the source.



Re: Insert data into Cassandra without Flink Cassandra connection

2018-05-02 Thread Piotr Nowojski
Hi,

The only way that I can think of is if you keep your flatMap operator with 
parallelism 1, but that might defeat the purpose. Otherwise there is no way to 
open one single connection and share it across multiple TaskManagers (which can 
be running on different physical machines). Please rethink your 
solution/approach with respect to distributed nature of Flink.

However there are some valid use cases where one would like to have some part 
of his job graph distributed and some part(s) non distributed - like issuing 
one single commit after a distributed write, or processing a data in parallel 
but writing them to a relational database like MySQL via one single Sink 
operator.. 

Piotrek

> On 26 Apr 2018, at 15:23, Soheil Pourbafrani  wrote:
> 
> Here is my code 
> 
> stream.flatMap(new FlatMapFunction() {
> 
> @Override
> public void flatMap(byte[] value, Collector out) throws Exception {
> Parser.setInsert(true);
> CassandraConnection.connect();
> Parser.setInsert(true);
> System.out.println("\n*** New Message ***\n");
> System.out.println("Row Number : " + i ++ );
> System.out.println("Message: " + HexUtiles.bytesToHex(value));
> Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
> }
> });
> 
> 
> On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani  > wrote:
> I want to use Cassandra native connection (Not Flink Cassandra connection) to 
> insert some data into Cassandra. According to the design of the code, the 
> connection to Cassandra will open once at the start and all taskmanager use 
> it to write data.  It's ok running in local mode.
> 
> The problem is when I submit the code on YARN cluster, as each taskmanager 
> has it's own JVM, the connection to the Cassandra will not share and I should 
> open and close it for each taskmanager. Is there any way to have a connection 
> for all taskmanagers?
> 



Re: Fat jar fails deployment (streaming job too large)

2018-05-02 Thread Piotr Nowojski
Short answer: could be that your job is simply too big to be serialised, 
distributed and deserialised in the given time and you would have to increase 
timeouts even more.

Long answer: 

Do you have the same problem when you try to submit smaller job? Does your 
cluster work for simpler jobs? Try cutting down/simplifying your job up to the 
point it works. Maybe you will be able to pin down one single operator that’s 
causing the problem (one that have for example huge static data structure). If 
so, you might be able to optimise your operators in some way. Maybe some 
operator is doing some weird things and causing problems.

You could also try to approach this problem from other direction (as previously 
suggested by Fabian). Try to profile/find out what the cluster is doing, where 
is the problem. Job Manager? One Task Manager? All of the Task Managers? Is 
there high cpu usage somewhere? Maybe one thread somewhere is overloaded? High 
network usage? After identifying potential problematic JVM’s, you could attach 
a code profiler or print stack traces to further pin down the problem. 

Piotrek

> On 30 Apr 2018, at 21:53, Chan, Regina  wrote:
> 
> Any updates on this one? I'm seeing similar issues with 1.3.3 and the batch 
> api. 
> 
> Main difference is that I have even more operators ~850, mostly maps and 
> filters with one cogroup. I don't really want to set a akka.client.timeout 
> for anything more than 10 minutes seeing that it still fails with that 
> amount. The akka.framesize is already 500Mb... 
> 
> akka.framesize: 524288000b
> akka.ask.timeout: 10min
> akka.client.timeout: 10min
> akka.lookup.timeout: 10min
> 
> 
> Thanks,
> Regina
> 
> 
> 
> -Original Message-
> From: Niels [mailto:nielsdenis...@gmail.com ] 
> Sent: Tuesday, February 27, 2018 11:40 AM
> To: user@flink.apache.org 
> Subject: Re: Fat jar fails deployment (streaming job too large)
> 
> Hi Till,
> 
> I've just tried to set on the *client*:
> akka.client.timeout: 300s 
> 
> On the *cluster*:
> akka.ask.timeout: 30s
> akka.lookup.timeout: 30s
> akka.client.timeout: 300s
> akka.framesize: 104857600b #(10x the original of 10MB)
> akka.log.lifecycle.events: true
> 
> Still gives me the same issue, the fat jar isn't deployed. See the attached
> files for the logs of the jobmanager and the deployer. Let me know if I can
> provide you with any additional info. Thanks for your help!
> 
> Cheers,
> Niels
> 
> Flink_deploy_log.txt
>   
> 
>  >  
> flink_jobmanager_log.txt
>   
> 
>  >  
> 
> 
> 
> 
> 
> --
> Sent from: 
> https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk=yX4z6UV1AFsAQtJsVquzujhFio0CgYr-tAIoroUXP8E=
>  
> 


Re: Apache Flink - Flink Forward SF 2018 - Scaling stream data pipelines (source code)

2018-05-02 Thread Piotr Nowojski
Hi,

Till, do have this code somewhere?

M Singh: Till is out of the office and will be back on next week, so he will 
probably not be able to respond for couple of days.

Piotrek

> On 30 Apr 2018, at 13:51, M Singh  wrote:
> 
> Hi:
> 
> I was looking at the flink-forward sf 2018 presentations and wanted to find 
> out if there is a git repo where I can find the code for "Scaling stream data 
> pipelines" by Till Rohrmann & Flavio Junqueira ?
> 
> Thanks
> 



Re: ConnectedIterativeStreams and processing state 1.4.2

2018-05-02 Thread Piotr Nowojski
Hi,

Why can not you use simple CoProcessFunction and handle cache updates within 
it’s processElement1 or processElement2 method?

Piotrek

> On 1 May 2018, at 10:20, Lasse Nedergaard  wrote:
> 
> Hi.
> 
> I have a case where I have a input stream that I want to enrich with external 
> data. I want to cache some of the external lookup data to improve the overall 
> performances.
> To update my cache (a CoProcessFunction) I would use iteration to send the 
> external enriched information back to the cache and update a mapstate. I use 
> CoProcesFunction as the input stream and the enrich stream contains 2 
> diff.object types and I don't want to mix them. 
> Because I use a ConnectedIterativeStream I can't use state in my 
> CoProcessFunction because the ConnectedIterativeStream create a DataStream 
> based on the Feedback signature and not the stream I close the iteration with 
> and it is not possible to provide a keySelector in the withFeedbackType
> 
> Form Flink source
> public ConnectedIterativeStreams(DataStream input, TypeInformation 
> feedbackType, long waitTime) {
> super(input.getExecutionEnvironment(), input, new 
> DataStream(input.getExecutionEnvironment(), new 
> CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
> }
> and both streams need to be keyed before state are assigned to the operator.
> Any ideas how to workaround this problem?
> 
> My sudo code is as below.
> 
> IterativeStream.ConnectedIterativeStreams iteration 
> = inputStream
> .keyBy(obj -> obj.getkey))
> 
> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
> TypeHint() {}));
> 
> DataStream enrichedStream = iteration
> .process(new EnrichFromState());
> 
> DataStream notEnrichedOutput = enrichedStream
> .filter(obj -> obj.enriched);
> 
> EnrichService EnrichService = new EnrichService();
> DataStream enrichedFromApi = 
> EnrichService.parse(notEnrichedOutput);
> 
> DataStream newEnrich = enrichedFromApi
> .map(obj -> {
> 
> EnrichData newData =  new EnrichData();
> newData.xx = obj.xx();
> 
> return newData;
> })
> .keyBy(obj -> obj.getkey);
> 
> 
> iteration.closeWith(newAddresses);
> 



Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Piotr Nowojski
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is 
non existent), but one way or another you have to make your SerializationSchema 
serialisable. If indeed this is the problem, maybe a better place to ask this 
question is on Stack Overflow or some scala specific mailing list/board (unless 
someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

> On 1 May 2018, at 16:30, Wouter Zorgdrager  wrote:
> 
> So, I'm still struggling with this issue. I dived a bit more into the problem 
> and I'm pretty sure that the problem is that I have to (implicitly) pass the 
> SchemaFor and RecordTo classes to my serialization schema (otherwise I can't 
> make it generic). However those class aren't serializable, but of course I 
> can't annotate them transient nor make it a lazy val which gives me the 
> current issue. 
> 
> I hope someone has some leads for me. 
> 
> Thanks!
> 
> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager  >:
> Hi Bill,
> 
> Thanks for your answer. However this proposal isn't going to solve my issue, 
> since the problem here is that the context bounds I need to give in order to 
> serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable 
> classes. This results in Flink not being able to serialize the KafkaProducer 
> failing the whole job. 
> 
> Thanks,
> Wouter
> 
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill  >:
> The things I would try would first in you are you class Person and Address 
> have getters and setters and a no argument constructor.
> 
>  
> 
> From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com 
> ] 
> Sent: Wednesday, April 25, 2018 7:17 AM
> To: user@flink.apache.org 
> Subject: KafkaProducer with generic (Avro) serialization schema
> 
>  
> 
> Dear reader,
> 
>  
> 
> I'm currently working on writing a KafkaProducer which is able to serialize a 
> generic type using avro4s.
> 
> However this serialization schema is not serializable itself. Here is my code 
> for this:
> 
>  
> 
> The serialization schema:
> 
> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends 
> SerializationSchema[IN] {
> 
>  
> 
>   override def serialize(element: IN): Array[Byte] = {
> 
> val byteArray = new ByteArrayOutputStream()
> 
> val avroSer = AvroOutputStream.binary[IN](byteArray)
> 
> avroSer.write(element)
> 
> avroSer.flush()
> 
> avroSer.close()
> 
>  
> 
> return byteArray.toByteArray
> 
>   }
> 
> }
> 
>  
> 
> The job code:
> 
> case class Person(name : String, age : Int, address : Address)
> 
> case class Address(city : String, street : String)
> 
>  
> 
> class SimpleJob {
> 
>  
> 
>   @transient
> 
>   private lazy val serSchema : AvroSerializationSchema[Person] = new 
> AvroSerializationSchema[Person]()
> 
>  
> 
>   def start() = {
> 
> val testPerson = Person("Test", 100, Address("Test", "Test"))
> 
>  
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>  
> 
> env.
> 
>   fromCollection(Seq(testPerson)).
> 
>   addSink(createKafkaSink())
> 
>  
> 
> env.execute("Flink sample job")
> 
>   }
> 
>  
> 
>  
> 
>   def createKafkaSink() : RichSinkFunction[Person] = {
> 
> //set some properties
> 
> val properties = new Properties()
> 
> properties.put("bootstrap.servers", "127.0.0.01:9092 
> ")
> 
> properties.put("zookeeper.connect", "127.0.0.1:2181 
> ")
> 
>  
> 
> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
> 
>   }
> 
>  
> 
> }
> 
>  
> 
> The code does compile, however it gives the following error on runtime: 
> InvalidProgramException: 
> Objectorg.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>  
> 
>  is not serializable.
> 
>  
> 
> I assume this means that my custom SerializationSchema is not serializable 
> due to the use of SchemaFor, FromRecord and ToRecord. 
> 
> Anyone knows a solution or workaround?
> 
>  
> 
> Thanks in advance!
> 
> Wouter
> 
> This message contains confidential information and is intended only for the 
> individual named. If you are not the named addressee, you should not 
> disseminate, distribute, alter or copy this e-mail. Please 

Re: intentional back-pressure (or a poor man's side-input)

2018-05-03 Thread Piotr Nowojski
Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
need a way to express state “block one input side of the CoProcessFunction”, 
pass this information up to the input gate and handle it probably similar to 
how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs 
in case of checkpoint barrier. You can not just block inside `processElement1` 
method.

However I haven’t thought it through and maybe there could be some issues 
regarding checkpointing (what should happen to checkpoint barriers if you are 
blocking one side of the input? Should this block checkpoint barrier as well? 
Should you cancel checkpoint?).

Piotrek

> On 2 May 2018, at 16:31, Derek VerLee  wrote:
> 
> 
> I was just thinking about about letting a coprocessfunction "block" or cause 
> back pressure on one of it's streams?
> Has this been discussed as an option?
> Does anyone know a way to effectively accomplish this?
> 
> I think I could get a lot of mileage out of something like that without 
> needing a full implementation of FLIP-17 (which I would eagerly await still). 
> 
> As mentioned on another thread, one could use a liststate to buffer the main 
> input until the "side input" was sufficiently processed.  However the 
> downside of this is that I have no way to control the size of those buffers, 
> whereas with backpressure, the system will naturally take care of it.



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-20 Thread Piotr Nowojski
But you said

> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14

To avoid this error that you describe I think that you have to ensure, that no 
1.14 commons-compress comes from your application, because it can conflict with 
1.4.1 used by flink cluster.

By shading I meant that you could shade/relocate 1.14 usages in your 
application, so that they don’t collide with Flink’s 1.4.1.

Piotrek

> On 19 Oct 2017, at 19:58, r. r. <rob...@abv.bg> wrote:
> 
> Thanks, Piotr
> but my app code is self-contained in a fat-jar with maven-shade, so why would 
> the class path affect this?
> 
> by shade commons-compress do you mean :
> 
> it doesn't have effect either
> 
> as a last resort i may try to rebuild Flink to use 1.14, but don't want to go 
> there yet =/
> 
> 
> Best regards
> 
> 
> 
> 
> 
> 
>>  Оригинално писмо 
> 
>> От: Piotr Nowojski pi...@data-artisans.com
> 
>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>> До: "r. r." <rob...@abv.bg>
> 
>> Изпратено на: 19.10.2017 20:04
> 
> 
> 
> 
>> I’m not 100% sure, so treat my answer with a grain of salt.
> 
>> 
> 
>> I think when you start the cluster this way, dependencies (some? all?) are 
>> being loaded to the class path before loading user’s application. At that 
>> point, it doesn’t matter whether you have excluded commons-compress 1.4.1 in 
>> yours application pom.xml. I’m not sure if this is solvable in some way, or 
>> not.
> 
>> 
> 
>> Maybe as a walk around, you could shade commons-compress usages in your 
>> pom.xml?
> 
>> 
> 
>> Piotr Nowojski
> 
>> 
> 
>>> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>> flink is started with bin/start-local.sh
> 
>>> 
> 
>>> there is no classpath variable in the environment; 
>>> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
>>> should be overridden by the dependencyManagement directive
> 
>>> 
> 
>>> here is the stacktrace:
> 
>>> 
> 
>>> The program finished with the following exception:
> 
>>> 
> 
>>> org.apache.flink.client.program.ProgramInvocationException: The program 
>>> execution failed: Job execution failed.
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> 
>>>at 
>>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> 
>>>at 
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> 
>>>at com.foot.semantic.flink.PipelineJob.main(PipelineJob.java:73)
> 
>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>>>at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 
>>>at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>>>at java.lang.reflect.Method.invoke(Method.java:497)
> 
>>>at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 
>>>at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> 
>>>at 
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> 
>>>at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> 
>>>at 
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> 
>>>at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> 
>>>at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> 
>>>at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> 
>>>at java.security.AccessController.doPrivileged(Native Method)
> 
>>>at javax.security.auth.Subject.doAs(Subject.java:422)
> 
>>>at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> 
>>>at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.

Re: Watermark on connected stream

2017-10-19 Thread Piotr Nowojski
Hi,

As you can see in 
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark1
 it takes a minimum of both of the inputs.

Piotrek
 
> On 19 Oct 2017, at 14:06, Kien Truong  wrote:
> 
> Hi,
> 
> If I connect two stream with different watermark, how are the watermark of 
> the resulting stream determined ?
> 
> 
> Best regards,
> 
> Kien
> 



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-19 Thread Piotr Nowojski
Hi,

What is the full stack trace of the error?
Are you sure that there is no commons-compresss somewhere in the classpath 
(like in the lib directory)? How are you running your Flink cluster?

Piotrek

> On 19 Oct 2017, at 13:34, r. r.  wrote:
> 
> Hello
> I have a job that runs an Apache Tika pipeline and it fails with "Caused by: 
> java.lang.NoSuchMethodError: 
> org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;"
> 
> Flink includes commons-compress 1.4.1, while Tika needs 1.14. 
> I also have Apache Avro in the project with commons-compress at 1.8.1, so I 
> force 1.14 with 
> 
> 
> 
> 
> org.apache.commons
> commons-compress
> 1.14
> 
> 
> 
> 
> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14 and 
> after purge, the local maven repo also only contains 1.14
> 
> yet, after i deploy the job and it reads an Avro package from kafka and 
> passes it to Tika, it fails with the error above, which leads me to think it 
> somehow uses commons-compress at a version prior to 1.14, because method 
> 'detect' is not present in older versions
> 
> I excluded/included it from the fat-jar
> org.apache.commons:commons-compress
> still the same problem
> 
> thanks for any hints!
> 
> 



Re: Watermark on connected stream

2017-10-19 Thread Piotr Nowojski
With aitozi we have a hat trick oO
 
> On 19 Oct 2017, at 17:08, Tzu-Li (Gordon) Tai  wrote:
> 
> Ah, sorry for the duplicate answer, I didn’t see Piotr’s reply. Slight delay 
> on the mail client.
> 
> 
> On 19 October 2017 at 11:05:01 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org 
> ) wrote:
> 
>> Hi Kien,
>> 
>> The watermark of an operator with multiple inputs will be determined by the 
>> current minimum watermark across all inputs.
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> On 19 October 2017 at 8:06:11 PM, Kien Truong (duckientru...@gmail.com 
>> ) wrote:
>> 
>>> Hi, 
>>> 
>>> If I connect two stream with different watermark, how are the watermark 
>>> of the resulting stream determined ? 
>>> 
>>> 
>>> Best regards, 
>>> 
>>> Kien



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-19 Thread Piotr Nowojski
I’m not 100% sure, so treat my answer with a grain of salt.

I think when you start the cluster this way, dependencies (some? all?) are 
being loaded to the class path before loading user’s application. At that 
point, it doesn’t matter whether you have excluded commons-compress 1.4.1 in 
yours application pom.xml. I’m not sure if this is solvable in some way, or not.

Maybe as a walk around, you could shade commons-compress usages in your pom.xml?

Piotr Nowojski

> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
> flink is started with bin/start-local.sh
> 
> there is no classpath variable in the environment; 
> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
> should be overridden by the dependencyManagement directive
> 
> here is the stacktrace:
> 
> The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> at com.foot.semantic.flink.PipelineJob.main(PipelineJob.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:933)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
> at 
> org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:114)
> at 
> org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:85)
> at 
> org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:77)
> at 
> org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:115)

Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-20 Thread Piotr Nowojski
That’s good to hear :)

Unfortunately at this moment dependencies can pollute class path in both ways 
(Flink’s can mess with user’s application and also the other way around).

Cheers, Piotrek

> On 20 Oct 2017, at 15:11, r. r. <rob...@abv.bg> wrote:
> 
> By Ali Baba's beard and his forty bandits, Piotrek, this worked!
> My understanding was that I have to prevent Flink from loading the older 
> compress.jar and force the newer one.
> One I shade-relocated org.apache.commons.compress for my project the problem 
> went away
> 
> Many thanks!
> 
> 
> 
> 
> 
> 
>> ---- Оригинално писмо 
> 
>> От: Piotr Nowojski pi...@data-artisans.com
> 
>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>> До: "r. r." <rob...@abv.bg>
> 
>> Изпратено на: 20.10.2017 14:46
> 
> 
> 
> 
>> But you said
> 
>> 
> 
>>> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14
> 
>> 
> 
>> To avoid this error that you describe I think that you have to ensure, that 
>> no 1.14 commons-compress comes from your application, because it can 
>> conflict with 1.4.1 used by flink cluster.
> 
>> 
> 
>> By shading I meant that you could shade/relocate 1.14 usages in your 
>> application, so that they don’t collide with Flink’s 1.4.1.
> 
>> 
> 
>> Piotrek
> 
>> 
> 
>>> On 19 Oct 2017, at 19:58, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>> Thanks, Piotr
> 
>>> but my app code is self-contained in a fat-jar with maven-shade, so why 
>>> would the class path affect this?
> 
>>> 
> 
>>> by shade commons-compress do you mean :
> 
>>> 
> 
>>> it doesn't have effect either
> 
>>> 
> 
>>> as a last resort i may try to rebuild Flink to use 1.14, but don't want to 
>>> go there yet =/
> 
>>> 
> 
>>> 
> 
>>> Best regards
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>>>  Оригинално писмо 
> 
>>> 
> 
>>>> От: Piotr Nowojski pi...@data-artisans.com
> 
>>> 
> 
>>>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>>> 
> 
>>>> До: "r. r." <rob...@abv.bg>
> 
>>> 
> 
>>>> Изпратено на: 19.10.2017 20:04
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>>> I’m not 100% sure, so treat my answer with a grain of salt.
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> I think when you start the cluster this way, dependencies (some? all?) are 
>>>> being loaded to the class path before loading user’s application. At that 
>>>> point, it doesn’t matter whether you have excluded commons-compress 1.4.1 
>>>> in yours application pom.xml. I’m not sure if this is solvable in some 
>>>> way, or not.
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> Maybe as a walk around, you could shade commons-compress usages in your 
>>>> pom.xml?
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> Piotr Nowojski
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>>> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> flink is started with bin/start-local.sh
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> there is no classpath variable in the environment; 
>>>>> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
>>>>> should be overridden by the dependencyManagement directive
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> here is the stacktrace:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> The program finished with the following exception:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program 
>>>>> execution failed: Job execution failed.
> 
>>> 
> 
>>>>>   at 
>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> 
>>> 
> 
>>>>>   at 
>>>>> org.apache.flink.client.program.StandaloneCl

Re: flink can't read hdfs namenode logical url

2017-10-20 Thread Piotr Nowojski
Hi,

Please double check the content of config files in YARN_CONF_DIR and 
HADOOP_CONF_DIR (the first one has a priority over the latter one) and that 
they are pointing to correct files.

Also check logs (WARN and INFO) for any relevant entries.

Piotrek

> On 20 Oct 2017, at 06:07, 邓俊华  wrote:
> 
> hi,
> 
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 
> 



Re:

2017-10-20 Thread Piotr Nowojski
Hi,

Only batch API is using managed memory. If you are using streaming API, you can 
do two things:
- estimate max cache size based on for example fraction of max heap size 
- use WeakReference to implement your cache

In batch API, you could estimate max cache size based on:
- fraction of (heapSize - managedMemorySize)

managedMemorySize you can obtain from for example: 

getContainingTask().getEnvironment().getMemoryManager().getMemorySize();

But keep in mind, that any heap memory allocations in your code will contest 
for the same memory and as mentioned before, except of batch API, memory 
allocations are not accounted anywhere (because it difficult to calculate 
memory usage of a operator :( )

Piotrek

> On 20 Oct 2017, at 06:04, Navneeth Krishnan  wrote:
> 
> Hello All,
> 
> I have an in-memory cache created inside a user function and I need to assign 
> the max capacity for it. Since the program can be run on any hardware, I'm 
> thinking if I cloud assign based on flink's allocated managed memory. 
> 
> Is there a way to get the flink managed memory size inside a user function? 
> If not are there any other options?
> 
> Thanks,
> Navneeth



Re: HBase config settings go missing within Yarn.

2017-10-20 Thread Piotr Nowojski
Is this /etc/hbase/conf/hbase-site.xml file is present on all of the machines? 
If yes, could you share your code?

> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl> wrote:
> 
> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
> looking in the jobmanager.log of the container running the Flink task.
> That is where I was able to find these messages .
> 
> I do the
>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> in all places directly after the  HBaseConfiguration.create();
> That way I simply force the task to look on the actual Hadoop node for the 
> same file it already loaded locally.
> 
> The reason I'm suspecting Flink is because the clientside part of the Flink 
> application does have the right setting and the task/job actually running in 
> the cluster does not have the same settings.
> So it seems in the transition into the cluster the application does not copy 
> everything it has available locally for some reason.
> 
> There is a very high probability I did something wrong, I'm just not seeing 
> it at this moment.
> 
> Niels
> 
> 
> 
> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What do you mean by saying:
> 
>> When I open the logfiles on the Hadoop cluster I see this:
> 
> 
> The error doesn’t come from Flink? Where do you execute 
> 
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> 
> ?
> 
> To me it seems like it is a problem with misconfigured HBase and not 
> something related to Flink.
> 
> Piotrek
> 
>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> To facilitate you guys helping me I put this test project on github:
>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>> 
>> Niels Basjes
>> 
>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> Hi,
>> 
>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>> cluster where I need to connect to HBase.
>> 
>> What I have:
>> 
>> In my environment:
>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>> HBASE_CONF_DIR=/etc/hbase/conf/
>> HIVE_CONF_DIR=/etc/hive/conf/
>> YARN_CONF_DIR=/etc/hadoop/conf/
>> 
>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>> hosts for HBase.
>> 
>> My test code is this:
>> public class Main {
>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>> 
>>   public static void main(String[] args) throws Exception {
>> printZookeeperConfig();
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>> env.createInput(new HBaseSource()).print();
>> env.execute("HBase config problem");
>>   }
>> 
>>   public static void printZookeeperConfig() {
>> String zookeeper = 
>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>>   }
>> 
>>   public static class HBaseSource extends AbstractTableInputFormat {
>> @Override
>> public void configure(org.apache.flink.configuration.Configuration 
>> parameters) {
>>   table = createTable();
>>   if (table != null) {
>> scan = getScanner();
>>   }
>> }
>> 
>> private HTable createTable() {
>>   LOG.info("Initializing HBaseConfiguration");
>>   // Uses files found in the classpath
>>   org.apache.hadoop.conf.Configuration hConf = 
>> HBaseConfiguration.create();
>>   printZookeeperConfig();
>> 
>>   try {
>> return new HTable(hConf, getTableName());
>>   } catch (Exception e) {
>> LOG.error("Error instantiating a new HTable instance", e);
>>   }
>>   return null;
>> }
>> 
>> @Override
>> public String getTableName() {
>>   return "bugs:flink";
>> }
>> 
>> @Override
>> protected String mapResultToOutType(Result result) {
>>   return new 
>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>> }
>> 
>> 

Re: flink can't read hdfs namenode logical url

2017-10-23 Thread Piotr Nowojski
Hi,

Why in this new message there is a different host? Previously code was trying 
to connect to “master:8020” and now it is “startdt”? If you were able to change 
this host somehow between runs, I guess you should be also able to set it to 
correct one.

Piotrek

> On 23 Oct 2017, at 09:11, 邓俊华 <dengjun...@startdt.com> wrote:
> 
> Hi,
> 
> Thanks for your replay! I have been block in this for several days.
> And I have double checked that there are 
> hdfs-site.xml,core-site.xml,yarn-site.xml  in YARN_CONF_DIR. But it is still 
> can't read hdfs namenode logical url.
> 
> 2017-10-23 14:35:17,750 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Cannot find 
> hdfs-default configuration file
> 2017-10-23 14:35:17,750 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Cannot find 
> hdfs-site configuration file
> 2017-10-23 14:35:17,751 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Adding 
> /data/hadoop-2.7.3/etc/hadoop/core-site.xml to hadoop configuration
> 2017-10-23 14:35:17,751 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Adding 
> /data/hadoop-2.7.3/etc/hadoop/hdfs-site.xml to hadoop configuration
> 
> 2017-10-23 14:35:19,887 DEBUG org.apache.hadoop.hdfs.BlockReaderLocal 
>   - dfs.domain.socket.path = 
> 2017-10-23 14:35:19,952 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.lang.IllegalArgumentException: java.net.UnknownHostException: startdt
>   at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> --
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2017年10月20日(星期五) 21:39
> 收件人:邓俊华 <dengjun...@startdt.com>
> 抄 送:user <user@flink.apache.org>
> 主 题:Re: flink can't read hdfs namenode logical url
> 
> Hi,
> 
> Please double check the content of config files in YARN_CONF_DIR and 
> HADOOP_CONF_DIR (the first one has a priority over the latter one) and that 
> they are pointing to correct files.
> 
> Also check logs (WARN and INFO) for any relevant entries.
> 
> Piotrek
> 
> On 20 Oct 2017, at 06:07, 邓俊华 <dengjun...@startdt.com 
> <mailto:dengjun...@startdt.com>> wrote:
> 
> hi,
> 
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020 , it should be 9000, 
> my hdfs defaultfs is hdfs://master .
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failedjava.net.ConnectException: Call From 
> spark3/173.16.5.216 to master:8020 failed on connection exception: 
> java.net.ConnectException: Connection refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused 
> <http://wiki.apache.org/hadoop/ConnectionRefused>at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)  at 
> org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)at 
> org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)  at 
> org.apache.hadoop.ipc.Client.call(Client.java:1479)  at 
> org.apache.hadoop.ipc.Client.call(Client.java:1412)  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> su

Re: SLF4j logging system gets clobbered?

2017-10-23 Thread Piotr Nowojski
Till could you take a look at this?

Piotrek

> On 18 Oct 2017, at 20:32, Jared Stehler  
> wrote:
> 
> I’m having an issue where I’ve got logging setup and functioning for my 
> flink-mesos deployment, and works fine up to a point (the same point every 
> time) where it seems to fall back to “defaults” and loses all of my 
> configured filtering.
> 
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-8 has started.
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-16] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31014/user/taskmanager
>  
> )
>  as 697add78bd00fe7dc6a7aa60bc8d75fb. Current number of registered hosts is 
> 39. Current number of alive task slots is 39.
> 2017-10-11 21:37:18.820 [flink-akka.actor.default-dispatcher-17] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31018/user/taskmanager
>  
> )
>  as a6cff0f18d71aabfb3b112f5e2c36c2b. Current number of registered hosts is 
> 40. Current number of alive task slots is 40.
> 2017-10-11 21:37:18.821 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-00010 has started.
> 2017-10-11 
> 21:39:04,371:6171(0x7f67fe9cd700):ZOO_WARN@zookeeper_interest@1570: Exceeded 
> deadline by 13ms
> 
> — here is where it turns over into default pattern layout ---
> 21:39:05.616 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.client.JobClient 
> - Checking and uploading JAR files
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 21:39:09.788 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Submitting job 005b570ff2866023aa905f2bc850f7a3 
> (Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3).
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Using restart strategy 
> FailureRateRestartStrategy(failuresInterval=12 msdelayInterval=1000 
> msmaxFailuresPerInterval=3) for 005b570ff2866023aa905f2bc850f7a3.
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.r.e.ExecutionGraph - Job recovers via failover strategy: full graph 
> restart
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Running initialization on master for job 
> Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3 
> (005b570ff2866023aa905f2bc850f7a3).
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Successfully ran initialization on master in 0 
> ms.
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] WARN  
> o.a.f.configuration.Configuration - Config uses deprecated configuration key 
> 'high-availability.zookeeper.storageDir' instead of proper key 
> 'high-availability.storageDir'
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.failover-timeout, 60
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.initial-tasks, 1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.maximum-failed-tasks, -1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.resourcemanager.framework.role, '*'
> 
> The reason this is a vexing issue is that the app master then proceeds to 
> dump megabytes of " o.a.f.c.GlobalConfiguration - Loading configuration 
> property:” messages into the log, and I’m unable to filter them out.
> 
> My logback config is:
> 
> 
> 
> 
> 
> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level 
> %logger{60} %X{sourceThread} - %msg%n
> 
> 
> 
> 
> 
> ERROR
> 
> 
> 
>  level="OFF" />
> 
>  name="org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler" 
> level="OFF" />
>  name="org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase"
>  level="OFF" />
> 
>  level="WARN" />
>  level="WARN" />
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 



Re: HBase config settings go missing within Yarn.

2017-10-23 Thread Piotr Nowojski
Till do you have some idea what is going on? I do not see any meaningful 
difference between Niels code and HBaseWriteStreamExample.java. There is also a 
very similar issue on mailing list as well: “Flink can't read hdfs namenode 
logical url” 

Piotrek

> On 22 Oct 2017, at 12:56, Niels Basjes <ni...@basjes.nl> wrote:
> 
> HI,
> 
> Yes, on all nodes the the same /etc/hbase/conf/hbase-site.xml that contains 
> the correct settings for hbase to find zookeeper.
> That is why adding that files as an additional resource to the configuration 
> works.
> I have created a very simple project that reproduces the problem on my setup:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
> 
> Niels Basjes
> 
> 
> On Fri, Oct 20, 2017 at 6:54 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Is this /etc/hbase/conf/hbase-site.xml file is present on all of the 
> machines? If yes, could you share your code?
> 
>> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
>> looking in the jobmanager.log of the container running the Flink task.
>> That is where I was able to find these messages .
>> 
>> I do the
>>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> in all places directly after the  HBaseConfiguration.create();
>> That way I simply force the task to look on the actual Hadoop node for the 
>> same file it already loaded locally.
>> 
>> The reason I'm suspecting Flink is because the clientside part of the Flink 
>> application does have the right setting and the task/job actually running in 
>> the cluster does not have the same settings.
>> So it seems in the transition into the cluster the application does not copy 
>> everything it has available locally for some reason.
>> 
>> There is a very high probability I did something wrong, I'm just not seeing 
>> it at this moment.
>> 
>> Niels
>> 
>> 
>> 
>> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What do you mean by saying:
>> 
>>> When I open the logfiles on the Hadoop cluster I see this:
>> 
>> 
>> The error doesn’t come from Flink? Where do you execute 
>> 
>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> 
>> ?
>> 
>> To me it seems like it is a problem with misconfigured HBase and not 
>> something related to Flink.
>> 
>> Piotrek
>> 
>>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> 
>>> To facilitate you guys helping me I put this test project on github:
>>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>>> 
>>> Niels Basjes
>>> 
>>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> Hi,
>>> 
>>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>>> cluster where I need to connect to HBase.
>>> 
>>> What I have:
>>> 
>>> In my environment:
>>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>>> HBASE_CONF_DIR=/etc/hbase/conf/
>>> HIVE_CONF_DIR=/etc/hive/conf/
>>> YARN_CONF_DIR=/etc/hadoop/conf/
>>> 
>>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>>> hosts for HBase.
>>> 
>>> My test code is this:
>>> public class Main {
>>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>>> 
>>>   public static void main(String[] args) throws Exception {
>>> printZookeeperConfig();
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>> env.createInput(new HBaseSource()).print();
>>> env.execute("HBase config problem");
>>>   }
>>> 
>>>   public static void printZookeeperConfig() {
>>> String zookeeper = 
>>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>>> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zoo

Re: Datastream broadcast with KeyBy

2018-01-10 Thread Piotr Nowojski
Hi,

Could you elaborate what is the problem that you are having? What is the 
exception(s) that you are getting? I have tested such simple example and it’s 
seems to be working as expected:

DataStreamSource input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk  wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Piotr Nowojski
/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: Discarded events (4/4) 
> (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects 
> from JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: 
> JobManager is no longer reachable
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>   at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>   at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Sink: CounterSink (async 
> call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
> 
> 
> José Miguel Tejedor Fernández
> Server developer
> jose.fernan...@rovio.com <mailto:jose.fernan...@rovio.com>
> Rovio Entertainment Ltd.
> Keilaranta 7, FIN - 02150 Espoo, Finland
> www.rovio.com <http://www.rovio.com/>
> 
> 
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> there is no known limitation in the strict sense, but you might run out of 
> dfs space or job manager memory if you keep around a huge number checkpoints. 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? Usually keeping one checkpoint should do the job, maybe 
> a couple more if you are very afraid about corruption that goes beyond your 
> DFSs capabilities to handle it. Is there any reason for that or maybe a 
> misconception about increasing the number of retained checkpoints is good for?
> 
> Best,
> Stefan 
> 
> 
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Increasing akka’s timeouts is rarely a solution for any problems - it either 
>> do not help, or just mask the issue making it less visible. But yes, it is 
>&g

Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-19 Thread Piotr Nowojski
Hi,

It seems like you have not opened some of the ports. As I pointed out in the 
first mail, please go through all of the config options regarding 
hostnames/ports (not only those that appear in the log files, maybe something 
is not being logged) 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager>

jobmanager.rpc.port
taskmanager.rpc.port
taskmanager.data.port
blob.server.port 

And double check that they are accessible from appropriate machines, best by 
using some external tool like telnet and ncat. You network can be configured to 
accept some connections only from specific hosts (like localhost). For example 
in the case for which you attached the log files, did you check that the job 
manager host, can open a connection to the `stage_dbq_1:33633` (task manager 
host and it’s rpc port - rpc port by default is random).

Also make sure that the configurations on the task manager and job manager are 
consistent.

Piotrek

> On 18 Jan 2018, at 08:41, Reza Samee <reza.sa...@gmail.com> wrote:
> 
> Hi, 
> 
> I attached log file,
> 
> Thanks
> 
> On Mon, Jan 15, 2018 at 3:36 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Could you post full job manager and task manager logs from startup until the 
> first signs of the problem?
> 
> Thanks, Piotrek
> 
> 
>> On 15 Jan 2018, at 11:21, Reza Samee <reza.sa...@gmail.com 
>> <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> Thanks for response; 
>> And sorry the passed time.
>> 
>> The JobManager & TaskManager logged ports are open!
>> 
>> 
>> Is this log OK?
>> 2018-01-15 13:40:03,455 INFO  
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
>> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
>> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
>> 
>> When I kill task-manger, the jobmanager logs:
>> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor 
>>- Association with remote system 
>> [akka.tcp://flink@stage_dbq_1:45532 <>] has failed, address is now gated for 
>> [5000] ms. Reason: [Disassociated] 
>> 
>> But it will not decrement the number of available task-managers!
>> and when I start my signle task-manager again, it logs:
>> 
>> 2018-01-15 13:32:52,753 INFO  
>> org.apache.flink.runtime.instance.InstanceManager - Registered 
>> TaskManager at ??? (akka://flink/deadLetters <>) as 
>> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
>> Current number of alive task slots is 40.
>> 
>> 
>> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Search both job manager and task manager logs for ip address(es) and port(s) 
>> that have timeouted. First of all make sure that nodes are visible to each 
>> other using some simple ping. Afterwards please check that those timeouted 
>> ports are opened and not blocked by some firewall (telnet).
>> 
>> You can search the documentation for the configuration parameters with 
>> “port” in name:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>> But note that many of them are random by default.
>> 
>> Piotrek
>> 
>>> On 9 Jan 2018, at 17:56, Reza Samee <reza.sa...@gmail.com 
>>> <mailto:reza.sa...@gmail.com>> wrote:
>>> 
>>> 
>>> I'm running a flink-cluster (a mini one with just one node); but the 
>>> problem is that my TaskManager can't reach to my JobManager!
>>> 
>>> Here are logs from TaskManager
>>> ...
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 20, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 21, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 22, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 23, timeout: 30 
>>> seconds)
>>> Trying to re

Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-15 Thread Piotr Nowojski
Hi,

Could you post full job manager and task manager logs from startup until the 
first signs of the problem?

Thanks, Piotrek

> On 15 Jan 2018, at 11:21, Reza Samee <reza.sa...@gmail.com> wrote:
> 
> Thanks for response; 
> And sorry the passed time.
> 
> The JobManager & TaskManager logged ports are open!
> 
> 
> Is this log OK?
> 2018-01-15 13:40:03,455 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
> 
> When I kill task-manger, the jobmanager logs:
> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@stage_dbq_1:45532] has failed, address is now gated for 
> [5000] ms. Reason: [Disassociated] 
> 
> But it will not decrement the number of available task-managers!
> and when I start my signle task-manager again, it logs:
> 
> 2018-01-15 13:32:52,753 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered 
> TaskManager at ??? (akka://flink/deadLetters) as 
> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
> Current number of alive task slots is 40.
> 
> 
> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Search both job manager and task manager logs for ip address(es) and port(s) 
> that have timeouted. First of all make sure that nodes are visible to each 
> other using some simple ping. Afterwards please check that those timeouted 
> ports are opened and not blocked by some firewall (telnet).
> 
> You can search the documentation for the configuration parameters with “port” 
> in name:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
> But note that many of them are random by default.
> 
> Piotrek
> 
>> On 9 Jan 2018, at 17:56, Reza Samee <reza.sa...@gmail.com 
>> <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> 
>> I'm running a flink-cluster (a mini one with just one node); but the problem 
>> is that my TaskManager can't reach to my JobManager!
>> 
>> Here are logs from TaskManager
>> ...
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 20, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 21, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 22, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 23, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 24, timeout: 30 seconds)
>> ...
>> 
>> My "JobManager UI" shows my TaskManager with this Path & ID: 
>> "akka://flink/deadLetters <>" ( in TaskManagers tab)
>> And I found these lines in my JobManger stdout:
>> 
>> Resource Manager associating with leading JobManager 
>> Actor[akka://flink/user/jobmanager#-275619168 <>] - leader session null
>> TaskManager ResourceID{resourceId='1132cbdaf2d8204e5e42e321e8592754'} has 
>> started.
>> Registered TaskManager at MY_PRIV_IP (akka://flink/deadLetters <>) as 
>> 7d9568445b4557a74d05a0771a08ad9c. Current number of registered hosts is 1. 
>> Current number of alive task slots is 20.
>> 
>> 
>> What's the meaning of these lines? Where should I look for the solution?
>> 
>> 
>> 
>> 
>> -- 
>> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>
> 
> 
> 
> -- 
> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>


Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all 
of the inputs. Thus your hypothetical “ACK Operator” will get 
Watermark(Long.MAX_VALUE) only when of the preceding operators report 
Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like 
`flatMap`, that doesn’t emit anything, only passes the watermark (default 
implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your 
own implementation of an operator extending from AbstractStreamOperator. 
Probably you would only need to override processWatermark() method and there 
you could do the ACK operation once you get 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

> On 25 Jan 2018, at 17:56, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thank you for your detailed answer.
> 
> Yes, I want to generate the ack when all the records of the file are written 
> to DB.
> 
> So to understand what you are saying , we will receive a single EOF watermark 
> value at the ack operator when all the downstream operator process all the 
> records of the file. But what I understand regarding the watermark is each 
> parallel instance of the operator will emit the watermark, so how do I ensure 
> that the EOF is reached  or will I receive only one watermark at the ack 
> operator ?
> 
> 
> So the pipeline topology will look like 
> 
> DataStream  readFileStream = env.readFile()
> 
> readFileStream
>  .transform(// ContrinousFileReaderOperator)
>  .key(0)
>  .map(// encrichment)
>   .addSink(// DB)
> 
>  instead of add sink, should it be a  simple map operator which writes to DB 
> so that we can have a next ack operator which will generate the response.
> 
> Also, how do I get/access the Watermark value in the ack operator ? It will 
> be a simple  map operator, right ?
> 
> 
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> As you figured out, some dummy EOF record is one solution, however you might 
> try to achieve it also by wrapping an existing CSV function. Your wrapper 
> could emit this dummy EOF record. Another (probably better) idea is to use 
> Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
> ContrinousFileReaderOperator will do that for you, so you would just need to 
> handle the Watermark.
> 
> The question is, do you need to perform the ACK operation AFTER all of the DB 
> writes, or just after reading the CSV file? If the latter one, you could add 
> some custom ACK operator with parallelism one just after the CSV source that 
> waits for the EOF Watermark. 
> 
> If it is the first one (some kind of committing the DB writes), you would 
> need to to wait until the EOF passes through all of your operators. You would 
> need something like that:
> 
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
> 
> I hope this helps,
> Piotrek
> 
>> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Hi Guys,
>> 
>> Following is how my pipeline looks (DataStream API) :
>> 
>> [1] Read the data from the csv file
>> [2] KeyBy it by some id
>> [3] Do the enrichment and write it to DB
>> 
>> [1] reads the data in sequence as it has single parallelism and then I have 
>> default parallelism for the other operators.
>> 
>> I want to generate a response (ack) when all the data of the file is 
>> processed. How can I achieve this ?
>> 
>> One solution I can think of is to have EOF dummy record in a file and a 
>> unique field for all the records in that file. Doing a keyBy on this field 
>> will make sure that all records are sent to a single slot. So, when EOF  
>> dummy records is read I can generate a response/ack.
>> 
>> Is there a better way I can deal with this ?
>> 
>> 
>> Regards,
>> Vinay Patil
> 
> 



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
Hi,

As far as I know there is currently no simple way to do this:
Join stream with static data in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 

and
https://issues.apache.org/jira/browse/FLINK-6131 


One walk around might be to buffer on the state the Kafka input in your 
TwoInput operator until all of the broadcasted messages have arrived.
Another option might be to dynamically start your application. First run some 
computation to determine the fixed lists of ids and start the flink application 
with those values hardcoded in/passed via command line arguments.

Piotrek 

> On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
> 
> Hello,
> I have a scenario where I've two sources, one of them is source of fixed list 
> of ids for preloading (caching certain info which is slow) and second one is 
> the kafka consumer. I need to run Kafka after first one completes. I need a 
> mechanism to let the Kafka consumer know that it can start consuming 
> messages. How can I achieve this?
> thanks
> Ishwara Varnasi



Re: Avoiding deadlock with iterations

2018-01-25 Thread Piotr Nowojski
Hi,

This is a known problem and I don’t think there is an easy solution to this. 
Please refer to the:
http://mail-archives.apache.org/mod_mbox/flink-user/201704.mbox/%3c5486a7fd-41c3-4131-5100-272825088...@gaborhermann.com%3E
 

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 


Thanks,
Piotrek

> On 25 Jan 2018, at 05:36, Ken Krugler  wrote:
> 
> Hi all,
> 
> We’ve run into deadlocks with two different streaming workflows that have 
> iterations.
> 
> In both cases, the issue is with fan-out; if any operation in the loop can 
> emit more records than consumed, eventually a network buffer fills up, and 
> then everyone in the iteration loop is blocked.
> 
> One pattern we can use, when the operator that’s causing the fan-out has the 
> ability to decide how much to emit, is to have it behave as an async 
> function, emitting from a queue with multiple threads. If threads start 
> blocking because of back pressure, then the queue begins to fill up, and the 
> function can throttle back how much data it queues up. So this gives us a 
> small (carefully managed) data reservoir we can use to avoid the deadlock.
> 
> Is there a better approach? I didn’t see any way to determine how “full” the 
> various network buffers are, and use that for throttling. Plus there’s the 
> issue of partitioning, where it would be impossible in many cases to know the 
> impact of a record being emitted. So even if we could monitor buffers, I 
> don’t think it’s a viable solution.
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler 
> +1 530-210-6378
> 



Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

As you figured out, some dummy EOF record is one solution, however you might 
try to achieve it also by wrapping an existing CSV function. Your wrapper could 
emit this dummy EOF record. Another (probably better) idea is to use 
Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
ContrinousFileReaderOperator will do that for you, so you would just need to 
handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB 
writes, or just after reading the CSV file? If the latter one, you could add 
some custom ACK operator with parallelism one just after the CSV source that 
waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need 
to to wait until the EOF passes through all of your operators. You would need 
something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes 
-> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

> On 24 Jan 2018, at 23:19, Vinay Patil  wrote:
> 
> Hi Guys,
> 
> Following is how my pipeline looks (DataStream API) :
> 
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
> 
> [1] reads the data in sequence as it has single parallelism and then I have 
> default parallelism for the other operators.
> 
> I want to generate a response (ack) when all the data of the file is 
> processed. How can I achieve this ?
> 
> One solution I can think of is to have EOF dummy record in a file and a 
> unique field for all the records in that file. Doing a keyBy on this field 
> will make sure that all records are sent to a single slot. So, when EOF  
> dummy records is read I can generate a response/ack.
> 
> Is there a better way I can deal with this ?
> 
> 
> Regards,
> Vinay Patil



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
If you want to go this way, you could:
- as you proposed use some busy waiting with reading some file from a 
distributed file system
- wait for some network message (opening your own socket)
- use some other external system for this purpose: Kafka? Zookeeper?  

Although all of them seems hacky and I would prefer (as I proposed before) to 
pre compute those ids before running/starting the main Flink application. 
Probably would be simpler and easier to maintain.

Piotrek

> On 25 Jan 2018, at 13:47, Ishwara Varnasi <ivarn...@gmail.com> wrote:
> 
> The FLIP-17 is promising. Until it’s available I’m planning to do this: 
> extend Kafka consumer and add logic to hold consuming until other source 
> (fixed set) completes sending and those messages are processed by the 
> application. However the question is to how to let the Kafka consumer know 
> that it should now start consuming messages. What is the correct way to 
> broadcast messages to other tasks at runtime? I’d success with the 
> distributed cache (ie write status to a file in one task and other looks for 
> status in this file), but doesn’t look like good solution although works. 
> Thanks for the pointers.
> Ishwara Varnasi 
> 
> Sent from my iPhone
> 
> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> 
>> Hi,
>> 
>> As far as I know there is currently no simple way to do this:
>> Join stream with static data in 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>
>> and
>> https://issues.apache.org/jira/browse/FLINK-6131 
>> <https://issues.apache.org/jira/browse/FLINK-6131>
>> 
>> One walk around might be to buffer on the state the Kafka input in your 
>> TwoInput operator until all of the broadcasted messages have arrived.
>> Another option might be to dynamically start your application. First run 
>> some computation to determine the fixed lists of ids and start the flink 
>> application with those values hardcoded in/passed via command line arguments.
>> 
>> Piotrek 
>> 
>>> On 25 Jan 2018, at 04:10, Ishwara Varnasi <ivarn...@gmail.com 
>>> <mailto:ivarn...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> I have a scenario where I've two sources, one of them is source of fixed 
>>> list of ids for preloading (caching certain info which is slow) and second 
>>> one is the kafka consumer. I need to run Kafka after first one completes. I 
>>> need a mechanism to let the Kafka consumer know that it can start consuming 
>>> messages. How can I achieve this?
>>> thanks
>>> Ishwara Varnasi
>> 



Re: Stream job failed after increasing number retained checkpoints

2018-01-09 Thread Piotr Nowojski
Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do 
not help, or just mask the issue making it less visible. But yes, it is 
possible to bump the limits: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka
 


I don’t think that state.checkpoints.num-retained was thought to handle such 
large numbers of retained checkpoint so maybe there are some known/unknown 
limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track 
down the root cause of it. This one warning line doesn’t tell much. From where 
does it come from? Client log? Job manager log? Task manager log? Please search 
on the opposite side of the time outing connection for possible root cause of 
the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in 
the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez 
>  wrote:
> 
> Hello,
> 
> I have several stream jobs running (v. 1.3.1 ) in production which always 
> fails after a fixed period of around 30h after being executing. That's the 
> WARN trace before failing:
> 
> Association with remote system 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ] has failed, 
> address is now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ]] Caused by: [No 
> response from remote for outbound association. Handshake timed out after 
> [2 ms].
> 
> The main change done in the job configuration was to increase the 
> state.checkpoints.num-retained from 1 to 2880. I am using asynchronous 
> RocksDB to persists to snapshot the state. (I attach some screenshots with 
> the  checkpoint conf from webUI)
> 
> May my assumption be correct that the increase of checkpoints.num-retained is 
> causing the problem? Any known issue regarding this?
> Besides, Is there any way to increase the Akka handshake timeout from the 
> current 2 ms to a higher value? I considered that it may be convenient to 
> increase the timeout to 1 minute instead.
> 
> BR
> 
> 
>  17.35.18.png>



Re: Custom Partitioning for Keyed Streams

2018-01-10 Thread Piotr Nowojski
Hi,

I don’t think it is possible to enforce scheduling of two keys to different 
nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before 
keyBy, which is followed by final aggregation after the keyBy) can be the 
solution for handling a data skew. With pre aggregation, some (most?) of the 
work can be distributed and be done on the source node instead of doing all of 
the heavy lifting on the destination node. It has not been yet merged to the 
Flink code, but it’s entirely a user space code, which you could copy paste 
(and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 

Please pay attention at the limitations of this code (documented in the java 
doc).

If above code doesn’t work for you for whatever reason, you can also try to 
implement some custom tailored pre aggregation. Like having two keyBy steps, 
where in first you can artificially split A and B keys into couple of smaller 
ones and the second keyBy could merge/squash the results.

Piotrek

> On 9 Jan 2018, at 21:55, Martin, Nick  wrote:
> 
> Have a set of stateful operators that rely on keyed state. There is 
> substantial skew between keys (i.e. there will be 100 messages on keys A and 
> B, and 10 messages each on keys C-J), and key selection assignment is 
> dictated by the needs of my application such that I can’t choose keys in a 
> way that will eliminate the skew. The skew is somewhat predictable (i.e. I 
> know keys A and B will usually get roughly 10x as many messages as the rest) 
> and fairly consistent on different timescales (i.e. counting the messages on 
> each key for 30 seconds would provide a reasonably good guess as to the 
> distribution of messages that will be received over the next 10-20 minutes).
>  
> The problem I’m having is that often the high volume keys (A and B in the 
> example) end up on the same task slot and slow it down, while the low volume 
> ones are distributed across the other operators, leaving them underloaded. I 
> looked into the available physical partitioning functions, but it looks like 
> that functionality is generally incompatible with keyed streams, and I need 
> access to keyed state to do my actual processing. Is there any way I can get 
> better load balancing while using keyed state?
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Send ACK when all records of file are processed

2018-01-30 Thread Piotr Nowojski
In case of reading from input files, at the EOF event, readers will send 
Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will 
be propagated accordingly. So your ACK operator will get 
Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be 
possible to use this Watermark(Long.MAX_VALUE). In that case you would need to 
emit some dummy EOF record, containing some meta information like filename 
alongside with correctly set event time to a value greater then original even 
read from Kafka which contained the filename to process. You would have to pass 
this EOF dummy record to your EOF operator. There you you would need to create 
some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now 
whenever you process watermarks, you can check for which fileNames does this 
watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when 
reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might 
not always be the case for Kafka: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission>)

Piotrek

> On 30 Jan 2018, at 15:17, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Yeh, so this is the current implementation.
> 
> One question regarding the Watermark, since watermark is chosen as minimum 
> value of all of input streams, only one input  stream will have watermark 
> value to LONG.MAX_VALUE which denotes the EOF processing whereas the other 
> streams will not have this value , is my understanding right ? So in this 
> case LONG.MAX_VALUE will always be a greater value than it's input streams. 
> Or the LONG.MAX_VALUE watermark will flow from each input stream ?
> 
> 
> I was thinking of directly reading from Kafka as source in Flink in order to 
> remove the middle layer of independent Kafka Consumer which is triggering 
> Flink job.
> 
> So, the pipeline will be 1. readFrom Kafka -> take the File location -> read 
> using FileReaderOperator
> 
> But in this case how do I determine for which File I have received the 
> LONG.MAX_VALUE, it will get complicated.
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Thanks for the clarification :)
> 
> Since you have one Job per an ACK, you can just relay on 
> Watermark(Long.MAX_VALUE) to mark the end of the processing.
> 
> More complicated solution (compared to what I proposed before) would be 
> needed if you had one long living job (for example multiple weeks) and it 
> would need to produce multiple ACKs in different point of time.
> 
> Piotrek
> 
> 
>> On 29 Jan 2018, at 15:43, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Sure, here is the complete design that we have :
>> 
>> File metadata (NFS location of file) is stored in kafka , we are having a 
>> Kafka Consumer (not flink one) which will read from each partition and 
>> trigger a Flink job on cluster. 
>> 
>> The Flink job will then read from a file and do the processing as I 
>> mentioned earlier.
>> 
>> The requirement here is we need to trigger a ACK if the validations for all 
>> the records in a file are successful.
>> 
>> P.S I know we are not using Kafka to its full potential and are just using 
>> it for storing metadata :) 
>> 
>> Regards,
>> Vinay Patil
>> 
>> On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Could you rephrase what is your concern? 
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 25 Jan 2018, at 18:54, Vinay Patil <vinay18.pa...@gmail.com 
>>> <mailto:vinay18.pa...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> No, to clarify I need to send the ack for each file when it gets processed 
>>> completely and there are multiple files that I am going to read from the 
>>> shared location.
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>>> On Thu, Ja

Re: Latest version of Kafka

2018-02-02 Thread Piotr Nowojski
Hi,

Flink as for now provides only a connector for Kafka 0.11, which is using 
KafkaClient in 0.11.x version. However you should be able to use it for reading 
to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that 
Kafka 1.0 is backward compatible with 0.11. 

Piotrek

> On 1 Feb 2018, at 14:46, Marchant, Hayden  wrote:
> 
> What is the newest version of Kafka that is compatible with Flink 1.4.0? I 
> see the last version of Kafka supported is 0.11 , from documentation, but has 
> any testing been done with Kafka 1.0?
> 
> 
> Hayden Marchant
> 



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
It seems so - but I’m saying this only basing on a annotations when this method 
was added (in the last couple of months). I’m not that much familiar with those 
code parts.

Piotrek

> On 5 Feb 2018, at 10:51, mingleizhang <zml13856086...@163.com> wrote:
> 
>  Makes sense to me now. Is it a new design at FLIP6 ?
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:49:05, "Piotr Nowojski" <pi...@data-artisans.com> wrote:
> I might be wrong but I think it is other way around and the naming of this 
> method is correct - it does exactly what it says. TaskManager comes with some 
> predefined task slots and it is the one that is offering them to a 
> JobManager. JobManager can use those slots offers to (later!) schedule tasks. 
> (#offerSlotsToJobManager() is being called during TaskManager initialisation).
> 
> Piotrek
> 
>> On 5 Feb 2018, at 10:44, mingleizhang <zml13856086...@163.com 
>> <mailto:zml13856086...@163.com>> wrote:
>> 
>> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
>> sounds confuse to me. It might be better to rename it to 
>> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. 
>> I just feel like offerSlotToJobManager sounds strange.. What do you think of 
>> this ?
>> 
>> Rice.
>> 
>> 
>> 
>> 
>> 
>> At 2018-02-05 17:30:32, "Piotr Nowojski" <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side 
>> of an RPC call that is being initiated on the sender side: 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
>> 
>> In other words, JobMasterGateway.offerSlots is called by a TaskManager and 
>> it is a way how TaskManager is advertising his slots to a JobManager.
>> 
>> Piotrek
>> 
>>> On 5 Feb 2018, at 08:38, mingleizhang <zml13856086...@163.com 
>>> <mailto:zml13856086...@163.com>> wrote:
>>> 
>>> I find some codes in flink does not make sense to me. Like in some classes 
>>> below
>>> 
>>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>>> slots to the job manager. I was wondering why a jobmanager running should 
>>> need slots ?
>>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>>> slots to jobmanager.
>>> 
>>> Above both are confuse me. I just know that Task running needs slots which 
>>> support by a taskManager. Does anyone let me why what does jobmanager needs 
>>> slots mean ?
>>> 
>>> Thanks in advance.
>>> Rice.
>>> 
>>>  
>>> 
>>> 
>>>  
>> 
>> 
>> 
>>  
> 
> 
> 
>  



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
Yes, but this issue is still a part of the FLIP-6 work.

Piotrek

> On 5 Feb 2018, at 11:01, mingleizhang <zml13856086...@163.com> wrote:
> 
> I found a website: https://issues.apache.org/jira/browse/FLINK-4360 
> <https://issues.apache.org/jira/browse/FLINK-4360> implemented this before.
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:56:49, "Piotr Nowojski" <pi...@data-artisans.com> wrote:
> It seems so - but I’m saying this only basing on a annotations when this 
> method was added (in the last couple of months). I’m not that much familiar 
> with those code parts.
> 
> Piotrek
> 
>> On 5 Feb 2018, at 10:51, mingleizhang <zml13856086...@163.com 
>> <mailto:zml13856086...@163.com>> wrote:
>> 
>>  Makes sense to me now. Is it a new design at FLIP6 ?
>> 
>> Rice.
>> 
>> 
>> 
>> 
>> 
>> At 2018-02-05 17:49:05, "Piotr Nowojski" <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> I might be wrong but I think it is other way around and the naming of this 
>> method is correct - it does exactly what it says. TaskManager comes with 
>> some predefined task slots and it is the one that is offering them to a 
>> JobManager. JobManager can use those slots offers to (later!) schedule 
>> tasks. (#offerSlotsToJobManager() is being called during TaskManager 
>> initialisation).
>> 
>> Piotrek
>> 
>>> On 5 Feb 2018, at 10:44, mingleizhang <zml13856086...@163.com 
>>> <mailto:zml13856086...@163.com>> wrote:
>>> 
>>> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
>>> sounds confuse to me. It might be better to rename it to 
>>> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. 
>>> I just feel like offerSlotToJobManager sounds strange.. What do you think 
>>> of this ?
>>> 
>>> Rice.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> At 2018-02-05 17:30:32, "Piotr Nowojski" <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> wrote:
>>> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side 
>>> of an RPC call that is being initiated on the sender side: 
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
>>> 
>>> In other words, JobMasterGateway.offerSlots is called by a TaskManager and 
>>> it is a way how TaskManager is advertising his slots to a JobManager.
>>> 
>>> Piotrek
>>> 
>>>> On 5 Feb 2018, at 08:38, mingleizhang <zml13856086...@163.com 
>>>> <mailto:zml13856086...@163.com>> wrote:
>>>> 
>>>> I find some codes in flink does not make sense to me. Like in some classes 
>>>> below
>>>> 
>>>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>>>> slots to the job manager. I was wondering why a jobmanager running should 
>>>> need slots ?
>>>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>>>> slots to jobmanager.
>>>> 
>>>> Above both are confuse me. I just know that Task running needs slots which 
>>>> support by a taskManager. Does anyone let me why what does jobmanager 
>>>> needs slots mean ?
>>>> 
>>>> Thanks in advance.
>>>> Rice.
>>>> 
>>>>  
>>>> 
>>>> 
>>>>  
>>> 
>>> 
>>> 
>>>  
>> 
>> 
>> 
>>  
> 
> 
> 
>  



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
I might be wrong but I think it is other way around and the naming of this 
method is correct - it does exactly what it says. TaskManager comes with some 
predefined task slots and it is the one that is offering them to a JobManager. 
JobManager can use those slots offers to (later!) schedule tasks. 
(#offerSlotsToJobManager() is being called during TaskManager initialisation).

Piotrek

> On 5 Feb 2018, at 10:44, mingleizhang <zml13856086...@163.com> wrote:
> 
> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
> sounds confuse to me. It might be better to rename it to 
> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. I 
> just feel like offerSlotToJobManager sounds strange.. What do you think of 
> this ?
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:30:32, "Piotr Nowojski" <pi...@data-artisans.com> wrote:
> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of 
> an RPC call that is being initiated on the sender side: 
> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
> 
> In other words, JobMasterGateway.offerSlots is called by a TaskManager and it 
> is a way how TaskManager is advertising his slots to a JobManager.
> 
> Piotrek
> 
>> On 5 Feb 2018, at 08:38, mingleizhang <zml13856086...@163.com 
>> <mailto:zml13856086...@163.com>> wrote:
>> 
>> I find some codes in flink does not make sense to me. Like in some classes 
>> below
>> 
>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>> slots to the job manager. I was wondering why a jobmanager running should 
>> need slots ?
>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>> slots to jobmanager.
>> 
>> Above both are confuse me. I just know that Task running needs slots which 
>> support by a taskManager. Does anyone let me why what does jobmanager needs 
>> slots mean ?
>> 
>> Thanks in advance.
>> Rice.
>> 
>>  
>> 
>> 
>>  
> 
> 
> 
>  



Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi,

FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the 
stream will not end. 

Simple `writeAsCsv(…)` on the other hand only flushes the output file on a 
stream end (see `OutputFormatSinkFunction`).

You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and 
implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support 
`exactly-once`)

Piotrek

> On 2 Feb 2018, at 18:32, geoff halmo  wrote:
> 
> Hi Flink community:
> 
> I am testing Flink but can't write the final(18 or so elements out to disk)
> 
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
> 
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
> 
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
> 
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 150655320,2017-09-27T:19:00-4:00[user@computer]
> 
> When I use .print instead of .writeCsv, the last line on console is
> 150682680,2017-09-30T23:00-400[America/New_York],21353



Re: Reduce parallelism without network transfer.

2018-02-05 Thread Piotr Nowojski
Hi,

It should work like this out of the box if you use rescale method:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
 


If it will not work, please let us know.

Piotrek

> On 3 Feb 2018, at 04:39, Kien Truong  wrote:
> 
> Hi,
> 
> Assuming that I have a streaming job, using 30 task managers with 4 slot 
> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
> there anyway so that each subtask of this operator get data from 4 upstream 
> subtasks running in the same task manager, thus avoiding network completely ?
> 
> Best regards, 
> Kien
> 
> Sent from TypeApp 


Re: Global window keyBy

2018-02-05 Thread Piotr Nowojski
Hi,

FIRE_AND_PURGE triggers `org.apache.flink.api.common.state.State#clear()` call 
and it "Removes the value mapped under the current key.”. So other keys should 
remain unmodified. 

I hope this solves your problem/question?

Piotrek

> On 4 Feb 2018, at 15:39, miki haiat  wrote:
> 
> Im using trigger   and a  guid in order to key stream .
> 
> I have  some problem to understand how to clear the window .
> 
> FIRE_AND_PURGE   in trigger  will remove the keyd data only ?
> if fire and purge is removing all the data then i need to implement it more 
> like this  example
> 
> https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/DrivingSegments.java
>  
> 
> 
> Evictor is used in order to clear the data by time stamp  but how can i clear 
> the data  by the key  also ?
> 
> 
> thanks ,
> 
> Miki



Re: Getting Key from keyBy() in ProcessFunction

2018-02-05 Thread Piotr Nowojski
I think now it’s not easily possible, however it might be a valid suggestion to 
add `OnTimerContext#getCurrentKey()` method. 

Besides using ValueState as you discussed before, as a some kind of a walk 
around you could copy and modify KeyedProcessOperator to suits your needs, but 
this would be more complicated.

Piotrek

> On 4 Feb 2018, at 20:36, Ken Krugler  wrote:
> 
> Hi Jürgen,
> 
> That makes sense to me.
> 
> Anyone from the Flink team want to comment on (a) if there is a way to get 
> the current key in the timer callback without using an explicit ValueState 
> that’s maintained in the processElement() method, and (b) if not, whether 
> that could be added to the context?
> 
> Thanks,
> 
> — Ken
> 
> 
>> On Feb 4, 2018, at 6:14 AM, Jürgen Thomann > > wrote:
>> 
>> Hi Ken,
>> 
>> thanks for your answer. You're right and I'm doing it already that way. I 
>> just hoped that I could avoid the ValueState (I'm using a MapState as well 
>> already, which does not store the key) and get the key from the provided 
>> Context of the ProcessFunction. This would avoid having the ValueState and 
>> setting it in the processElement just to know the key in the onTimer 
>> function. 
>> In the current way I have to check the ValueState for every element if the 
>> key is already set or just set it every time again the processElement method 
>> is invoked.
>> 
>> Best,
>> Jürgen
>> 
>> On 02.02.2018 18:37, Ken Krugler wrote:
>>> Hi Jürgen,
>>> 
 On Feb 2, 2018, at 6:24 AM, Jürgen Thomann > wrote:
 
 Hi,
 
 I'm currently using a ProcessFunction after a keyBy() and can't find a way 
 to get the key.
>>> 
>>> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>>> 
>>> So then that same field contains the key in the 
>>> ProcessFunction.processElement(in, …) parameter, yes?
>>> 
 I'm currently storing it in a ValueState within processElement
>>> 
>>> If you’re using a ValueState, then there’s one of those for each unique 
>>> key, not one for the operation.
>>> 
>>> I.e. the ValueState for key = “one” is separate from the ValueState for key 
>>> = “two”.
>>> 
>>> You typically store the key in the state so it’s accessible in the onTimer 
>>> method.
>>> 
 and set it all the time, so that I can access it in onTimer(). Is there a 
 better way to get the key? We are using Flink 1.3 at the moment.
>>> 
>>> The ValueState (what you used in processElement) that you’re accessing in 
>>> the onTimer() method is also scoped by the current key.
>>> 
>>> So assuming you stored the key in the state inside of your processElement() 
>>> call, then you should have everything you need.
>>> 
>>> — Ken
>>> 
>>> PS - Check out 
>>> https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
>>>  
>>> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of 
an RPC call that is being initiated on the sender side: 
org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.

In other words, JobMasterGateway.offerSlots is called by a TaskManager and it 
is a way how TaskManager is advertising his slots to a JobManager.

Piotrek

> On 5 Feb 2018, at 08:38, mingleizhang  wrote:
> 
> I find some codes in flink does not make sense to me. Like in some classes 
> below
> 
> JobMasterGateway.java has a offerSlots method which means Offers the given 
> slots to the job manager. I was wondering why a jobmanager running should 
> need slots ?
> TaskExecutor.java has a offerSlotsToJobManager method which means offer slots 
> to jobmanager.
> 
> Above both are confuse me. I just know that Task running needs slots which 
> support by a taskManager. Does anyone let me why what does jobmanager needs 
> slots mean ?
> 
> Thanks in advance.
> Rice.
> 
>  
> 
> 
>  



Re: Python and Scala

2018-02-13 Thread Piotr Nowojski
Hi,

1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
2. As for know there is no Python API for Flink Streaming, however there is 
some ongoing work with that: https://issues.apache.org/jira/browse/FLINK-5886 

3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for that: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
 


Piotrek

> On 13 Feb 2018, at 13:21, Esa Heikkinen  wrote:
> 
>  
> What the difference is to use Python and Scala in Flink ?
> Can I do all the same things with Python and Scala ? For examples CEP with 
> files.



Re: Ceph configuration for checkpoints?

2018-02-13 Thread Piotr Nowojski
Hi,

Have you tried to refer to ceph documentation? 
http://docs.ceph.com/docs/jewel/cephfs/hadoop/ 
 It claims to be:

> a drop-in replacement for the Hadoop File System (HDFS)

So I would first try to configure ceph according to their documentation and try 
to use Flink’s built in Hadoop connector.

Piotrek

> On 12 Feb 2018, at 20:11, Julio Biason  wrote:
> 
> Hello people,
> 
> I'm looking for the configuration options required to use Ceph as a backend 
> for checkpoints.
> 
> So far, I can see that `state.backend.fs.checkpointdir` and 
> `state.checkpoints.dir` should be something like 
> `ceph:///my-checkpoint-bucket`. But where do I define server and acess and 
> secret keys? I couldn't find anything remotely related to that in the docs...
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554



Re: Python and Scala

2018-02-14 Thread Piotr Nowojski
Hi

Scala REPL uses the same code as compiled library so they should work the same.

Piotrek

> On 13 Feb 2018, at 18:32, Esa Heikkinen <heikk...@student.tut.fi> wrote:
> 
> Hi
> 
> And what about the differences between Scala REPL and Scala (compiled) ?
> Esa
> 
> Piotr Nowojski kirjoitti 13.2.2018 klo 15:14:
>> Hi,
>> 
>> 1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
>> 2. As for know there is no Python API for Flink Streaming, however there is 
>> some ongoing work with that: 
>> https://issues.apache.org/jira/browse/FLINK-5886 
>> <https://issues.apache.org/jira/browse/FLINK-5886>
>> 3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for 
>> that: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream>
>> 
>> Piotrek
>> 
>>> On 13 Feb 2018, at 13:21, Esa Heikkinen <esa.heikki...@student.tut.fi 
>>> <mailto:esa.heikki...@student.tut.fi>> wrote:
>>> 
>>>  
>>> What the difference is to use Python and Scala in Flink ?
>>> Can I do all the same things with Python and Scala ? For examples CEP with 
>>> files.
>> 
> 



Re: Python and Scala

2018-02-14 Thread Piotr Nowojski
Hi,

I have never used it before, but it’s described in the documentation:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/scala_shell.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/scala_shell.html>

the "Adding external dependencies” part. It should answer to this question.

Piotrek

> On 14 Feb 2018, at 10:15, Esa Heikkinen <esa.heikki...@student.tut.fi> wrote:
> 
> Hi
>  
> Good news. Is it way to supply Scala-code from file to REPL ?
>  
> It seems the compiling is too complicated operation.. Actually I don’t get it 
> to work yet.
>  
> Esa
>  
> From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
> Sent: Wednesday, February 14, 2018 10:55 AM
> To: Esa Heikkinen <esa.heikki...@student.tut.fi>
> Cc: Esa Heikkinen <esa.heikki...@student.tut.fi>; user@flink.apache.org
> Subject: Re: Python and Scala
>  
> Hi
>  
> Scala REPL uses the same code as compiled library so they should work the 
> same.
>  
> Piotrek
> 
> 
> On 13 Feb 2018, at 18:32, Esa Heikkinen <heikk...@student.tut.fi 
> <mailto:heikk...@student.tut.fi>> wrote:
>  
> Hi
> And what about the differences between Scala REPL and Scala (compiled) ?
> Esa
> 
> Piotr Nowojski kirjoitti 13.2.2018 klo 15:14:
> Hi, 
>  
> 1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
> 2. As for know there is no Python API for Flink Streaming, however there is 
> some ongoing work with that: https://issues.apache.org/jira/browse/FLINK-5886 
> <https://issues.apache.org/jira/browse/FLINK-5886>
> 3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for 
> that: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream>
>  
> Piotrek
> 
> 
> On 13 Feb 2018, at 13:21, Esa Heikkinen <esa.heikki...@student.tut.fi 
> <mailto:esa.heikki...@student.tut.fi>> wrote:
>  
>  
> What the difference is to use Python and Scala in Flink ?
> Can I do all the same things with Python and Scala ? For examples CEP with 
> files.



Re: Reduce parallelism without network transfer.

2018-02-06 Thread Piotr Nowojski
Hi,

Rebalance is more safe default setting that protects against data skew. And 
even the smallest data skew can create a bottleneck much larger then the 
serialisation/network transfer cost. Especially if one changes the parallelism 
to a value that’s not a result of multiplication or division (like N down to 
N-1). And data skew can be arbitrarily large, while rebalance overhead compare 
to rescale is limited.

Piotrek


> On 6 Feb 2018, at 04:32, Kien Truong <duckientru...@gmail.com> wrote:
> 
> Thanks Piotr, it works.
> May I ask why default behavior when reducing parallelism is rebalance, and 
> not rescale ?
> 
> Regards,
> Kien
> 
> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
> On Feb 5, 2018, at 15:28, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> It should work like this out of the box if you use rescale method:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning>
> 
> If it will not work, please let us know.
> 
> Piotrek 
> 
>> On 3 Feb 2018, at 04:39, Kien Truong < duckientru...@gmail.com 
>> <mailto:duckientru...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> Assuming that I have a streaming job, using 30 task managers with 4 slot 
>> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
>> there anyway so that each subtask of this operator get data from 4 upstream 
>> subtasks running in the same task manager, thus avoiding network completely 
>> ? 
>> 
>> Best regards, 
>> Kien 
>> 
>> Sent from TypeApp <http://www.typeapp.com/r?b=11979>



Re: Rebalance to subtasks in same TaskManager instance

2018-02-06 Thread Piotr Nowojski
Hi,

Unfortunately I don’t think it’s currently possible in the Flink. Please feel 
free to submit a feature request for it on our JIRA 
https://issues.apache.org/jira/projects/FLINK/summary 


Have you tried out the setup using rebalance? In most cases overhead of 
rebalance over rescale is not that high as one might think.

Piotrek

> On 5 Feb 2018, at 15:16, johannes.barn...@clarivate.com wrote:
> 
> Hi,
> 
> I have a streaming topology with source parallelism of M and a target
> operator parallelism of N.
> For optimum performance I have found that I need to choose M and N
> independently.
> Also, the source subtasks do not all produce the same number of records and
> therefor I have to rebalance to the target operator to get optimum
> throughput.
> 
> The record sizes vary a lot (up to 10MB) but are about 200kB on average.
> 
> Through experimentation using the rescale() operator I have found that
> maximum throughput can be significantly increased if I restrict this
> rebalancing to target subtasks within the same TaskManager instances.
> 
> However I cannot use rescale for this purpose as it does not do a
> rebalancing to all target subtasks in the instance.
> 
> I was hoping to use a custom Partitioner to achieve this but it is not clear
> to me which partition would correspond to which subTask.
> 
> Is there any way currently to achieve this with Flink? 
> 
> If it helps I believe the feature I am hoping to achieve is similar to
> Storm's "Local or shuffle grouping".
> 
> Any help or suggestions will be appreciated.
> Hans
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: periodic trigger

2017-12-22 Thread Piotr Nowojski
Ok, I think now I understand your problem. 

Wouldn’t it be enough, if you change last global window to something like this:

lastUserSession
.timeWindowAll(Time.seconds(10))
.aggregate(new AverageSessionLengthAcrossAllUsers())
.print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the first 
window). This way it will aggregate and calculate average session length of 
only last “preview results” of the 60 seconds user windows (emitted every 10 
seconds from the first aggregation).

Piotrek

> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.pas...@next-stream.com> wrote:
> 
> Imagine a case where i want to run a computation every X seconds for 1 day 
> window. I want the calculate average session length for current day every X 
> seconds. Is there an easy way to achieve that?
> 
> On 21.12.2017 16:06, Piotr Nowojski wrote:
>> Hi,
>> 
>> You defined a tumbling window 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
>>  of 60 seconds, triggered every 10 seconds. This means that each input 
>> element can be processed/averaged up to 6 times (there is no other way if 
>> you trigger each window multiple times).
>> 
>> I am not sure what are you trying to achieve, but please refer to the 
>> documentation about different window types (tumbling, sliding, session) 
>> maybe it will clarify things for you:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>
>> 
>> If you want to avoid duplicated processing, use either tumbling window with 
>> default trigger (triggering at the end of the window), or use session 
>> windows.
>> 
>> Piotrek
>> 
>> 
>>> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.pas...@next-stream.com 
>>> <mailto:plamen.pas...@next-stream.com>> wrote:
>>> 
>>> Hi guys,
>>> I have the following code:
>>> 
>>> SingleOutputStreamOperator lastUserSession = env
>>> .socketTextStream("localhost", 9000, "\n")
>>> .map(new MapFunction<String, Event>() {
>>> @Override
>>> public Event map(String value) throws Exception {
>>> String[] row = value.split(",");
>>> return new Event(Long.valueOf(row[0]), row[1], 
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>> }
>>> })
>>> .assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>>> @Override
>>> public long extractTimestamp(Event element) {
>>> return element.timestamp;
>>> }
>>> })
>>> .keyBy("userId", "sessionId")
>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>> .maxBy("length", false);
>>> 
>>> lastUserSession
>>> .timeWindowAll(Time.seconds(60))
>>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>>> .print();
>>> 
>>> What i'm trying to achieve is to calculate the average session length every 
>>> 10 seconds. The problem is that once the window length is 60 seconds and a 
>>> computation is triggered
>>> every 10 seconds i will receive duplicate events in my average calculation 
>>> method so the average will not be correct. If i move 
>>> ContinuousProcessingTimeTrigger down before 
>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
>>> seconds.
>>> Any other suggestions how to workaround this?
>>> 
>>> Thanks
>> 
> 



Re: entrypoint for executing job in task manager

2017-12-22 Thread Piotr Nowojski
I don’t think there is such hook in the Flink code now. You will have to walk 
around this issue somehow in user space. 

Maybe you could make a contract that every operator before touching Guice, 
should call static synchronized method `initializeGuiceContext`. This method 
could search the classpath for classes with some specific annotations, for 
example `@MyInitializationHook` and install/add all of such hooks before 
actually using Guice?

Piotrek

> On 21 Dec 2017, at 17:49, Steven Wu <stevenz...@gmail.com> wrote:
> 
> We use Guice for dependency injection. We need to install additional Guice 
> modules (for bindings) when setting up this static context of Guice injector.
> 
> Calling the static initializer from operator open method won't really help. 
> Not all operators are implemented by app developer who want to install 
> additional Guice modules. E.g. kafka source operator is implemented/provided 
> by our platform. I think the source operator will open first, which means app 
> operator won't get a chance to initialize the static context. What would 
> really help if there is a entry hook (at task manager) that is executed 
> before any operator opening.
> 
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Open method is called just before any elements are processed. You can hook in 
> any initialisation logic there, including initialisation of a static context. 
> However keep in mind, that since this context is static, it will be shared 
> between multiple operators (if you are running parallelism > number of task 
> managers), so accesses to it must be synchronized (including initialisation). 
> Another thing to consider is that managing the life cycle of static context 
> can be tricky (when to close it and release it’s resources).
> 
> The questions is, whether you really need a static context?
> 
> Thanks,
> Piotrek
> 
> 
> > On 21 Dec 2017, at 07:53, Steven Wu <stevenz...@gmail.com 
> > <mailto:stevenz...@gmail.com>> wrote:
> >
> > Here is my understanding of how job submission works in Flink. When 
> > submitting a job to job manager via REST API, we provide a entry class. Job 
> > manager then evaluate job graph and ship serialized operators to task 
> > manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my own 
> > running context in task manager (e.g. calling a static method of some 
> > class). Does Flink provide any entry hook in task manager when executing a 
> > job (and tasks)? As for job manager, the entry class provides such hook 
> > where I can initialize my static context.
> >
> > Thanks,
> > Steven
> 
> 



Re: periodic trigger

2017-12-21 Thread Piotr Nowojski
Hi,

You defined a tumbling window 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
 
)
 of 60 seconds, triggered every 10 seconds. This means that each input element 
can be processed/averaged up to 6 times (there is no other way if you trigger 
each window multiple times).

I am not sure what are you trying to achieve, but please refer to the 
documentation about different window types (tumbling, sliding, session) maybe 
it will clarify things for you:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
 


If you want to avoid duplicated processing, use either tumbling window with 
default trigger (triggering at the end of the window), or use session windows.

Piotrek


> On 21 Dec 2017, at 13:29, Plamen Paskov  wrote:
> 
> Hi guys,
> I have the following code:
> 
> SingleOutputStreamOperator lastUserSession = env
> .socketTextStream("localhost", 9000, "\n")
> .map(new MapFunction() {
> @Override
> public Event map(String value) throws Exception {
> String[] row = value.split(",");
> return new Event(Long.valueOf(row[0]), row[1], 
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
> }
> })
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
> @Override
> public long extractTimestamp(Event element) {
> return element.timestamp;
> }
> })
> .keyBy("userId", "sessionId")
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
> .maxBy("length", false);
> 
> lastUserSession
> .timeWindowAll(Time.seconds(60))
> .aggregate(new AverageSessionLengthAcrossAllUsers())
> .print();
> 
> What i'm trying to achieve is to calculate the average session length every 
> 10 seconds. The problem is that once the window length is 60 seconds and a 
> computation is triggered
> every 10 seconds i will receive duplicate events in my average calculation 
> method so the average will not be correct. If i move 
> ContinuousProcessingTimeTrigger down before 
> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
> seconds.
> Any other suggestions how to workaround this?
> 
> Thanks



Re: periodic trigger

2018-01-03 Thread Piotr Nowojski
Hi,

Sorry for late response (because of the holiday period).

You didn’t mention lateness previously, that’s why I proposed such solution. 

Another approach would be to calculate max session length per user on the first 
aggregation level and at the same time remember what was the previously 
emitted/triggered value. Now, whenever you recalculate your window because of 
firing a trigger, you could check what is the new value and what was the 
previously emitted value. If they are the same, you do not have to emit 
anything. If they are different, you would have to issue an “update”/“diff” to 
the global aggregation on the second level. In case of simple average of 
session length, first level on update could emit “-old_value” and “+new_value” 
(negative old_value and positive new_value). 

To do this in Flink you could use 

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

and store the previously emitted values in

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context#windowState()

For efficiency reasons best to combine it with a reduce function using this call

org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction,
 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<T,R,K,W>)

Reduce will ensure that your ProcessWindowFunction will not have to process all 
events belonging to the window each time it is triggered, but only it will have 
to process the single reduced element.

In your case:

.keyBy("userId", "sessionId")
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.reduce(MAX_BY_LENGTH_REDUCE_FUNCTION, 
MY_FANCY_PROCESS_WINDOW_FUNCTION_WITH_UPDATES).

Of course second level global aggregation would have to understand and take 
those “update”/“diffs” into account, but that would be simple to implement by 
some custom reduce function/aggregate. You would like the sequence of values 1, 
1, 10, -10, 1 produce average of 1 and not 0.6 - negative values have to 
decrease counters in calculating the average value (to aggregate average value 
you always need to keep sum of values and a counter).

.timeWindowAll(Time.seconds(60))
.aggregate(FANCY_AGGREGATE_THAT_HANDLES_UPDATES)
.print();

Hope that helps.

Piotrek

> On 22 Dec 2017, at 14:10, Plamen Paskov <plamen.pas...@next-stream.com> wrote:
> 
> I think it will not solve the problem as if i set ContinuousEventTimeTrigger 
> to 10 seconds and allowedLateness(Time.seconds(60)) as i don't want to 
> discard events from different users received later then i might receive more 
> than one row for a single user based on the number of windows created by the 
> events of this user. That will make the the average computations wrong.
> 
> On 22.12.2017 12:10, Piotr Nowojski wrote:
>> Ok, I think now I understand your problem. 
>> 
>> Wouldn’t it be enough, if you change last global window to something like 
>> this:
>> 
>> lastUserSession
>> .timeWindowAll(Time.seconds(10))
>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>> .print();
>> 
>> (As a side note, maybe you should use ContinousEventTimeTrigger in the first 
>> window). This way it will aggregate and calculate average session length of 
>> only last “preview results” of the 60 seconds user windows (emitted every 10 
>> seconds from the first aggregation).
>> 
>> Piotrek
>> 
>>> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.pas...@next-stream.com 
>>> <mailto:plamen.pas...@next-stream.com>> wrote:
>>> 
>>> Imagine a case where i want to run a computation every X seconds for 1 day 
>>> window. I want the calculate average session length for current day every X 
>>> seconds. Is there an easy way to achieve that?
>>> 
>>> On 21.12.2017 16:06, Piotr Nowojski wrote:
>>>> Hi,
>>>> 
>>>> You defined a tumbling window 
>>>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
>>>>  of 60 seconds, triggered every 10 seconds. This means that each input 
>>>> element can be processed/averaged up to 6 times (there is no other way if 
>>>> you trigger each window multiple times).
>>>> 
>>>> I am not sure what are you trying to achieve, but please refer to the 
>>>> documentation about different window types (tumbling, sliding, session) 
>>>> maybe it will clarify things for y

Re: Heap Problem with Checkpoints

2018-06-19 Thread Piotr Nowojski
Hi,

Can you search the logs/std err/std output for log entries like:

log.warn("Failed to locally delete blob “ …) ?

I see in the code, that if file deletion fails for whatever the reason, 
TransientBlobCleanupTask can loop indefinitely trying to remove it over and 
over again. That might be ok, however it’s doing it without any back off time 
as fast as possible.

To confirm this, could you take couple of thread dumps and check whether some 
thread is spinning in 
org.apache.flink.runtime.blob.TransientBlobCleanupTask#run ?

If that’s indeed a case, the question would be why file deletion fails?

Piotrek

> On 18 Jun 2018, at 15:48, Fabian Wollert  wrote:
> 
> Hi Piotrek, thx a lot for your answer and sry for the late response. I was 
> running some more tests, but i still got the same problem. I was analyzing a 
> heap dump already with VisualVM, and thats how i got to the intention that it 
> was some S3 logging, but seems like i was wrong. on the newer tests, the heap 
> dump says the following (this time i used Eclipse MemoryAnalyzer): 
> 
> 
> 
> 
> Are you aware of problems with the BlobServer not cleaning up properly? I 
> tried also using a bigger instance, but this never stabilizes, it just keeps 
> increasing (gave it already 10GB+ Heap) ...
> 
> Cheers
> 
> --
> 
> Fabian Wollert
> Zalando SE
> 
> E-Mail: fabian.woll...@zalando.de <mailto:fabian.woll...@zalando.de>
> 
> 
> 
> Am Mo., 11. Juni 2018 um 10:46 Uhr schrieb Piotr Nowojski 
> mailto:pi...@data-artisans.com>>:
> Hi,
> 
> What kind of messages are those “logs about S3 operations”? Did you try to 
> google search them? Maybe it’s a known S3 issue?
> 
> Another approach is please use some heap space analyser from which you can 
> backtrack classes that are referencing those “memory leaks” and again try to 
> google any known memory issues.
> 
> It also could just mean, that it’s not a memory leak, but you just need to 
> allocate more heap space for your JVM (and memory consumption will stabilise 
> at some point).
> 
> Piotrek
> 
>> On 8 Jun 2018, at 18:32, Fabian Wollert > <mailto:fab...@zalando.de>> wrote:
>> 
>> Hi, in this email thread 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-AWS-S3-integration-java-lang-NullPointerException-null-uri-host-td20413.html>
>>  here, i tried to set up S3 as a filesystem backend for checkpoints. Now 
>> everything is working (Flink V1.5.0), but the JobMaster is accumulating Heap 
>> space, with eventually killing itself with HeapSpace OOM after several 
>> hours. If I don't enable Checkpointing, then everything is fine. I'm using 
>> the Flink S3 Shaded Libs (tried both the Hadoop and the Presto lib, no 
>> difference in this regard) from the tutorial. my checkpoint settings are 
>> this (job level):
>> 
>> env.enableCheckpointing(1000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>> env.getCheckpointConfig().setCheckpointTimeout(6);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> 
>> Another clue why i suspect the S3 Checkpointing is that the heapspace dump 
>> contains a lot of char[] objects with some logs about S3 operations.
>> 
>> anyone has an idea where to look further on this?
>> 
>> Cheers
>> 
>> --
>> 
>> Fabian Wollert
>> Zalando SE
>> 
>> E-Mail: fabian.woll...@zalando.de
>>  <mailto:fabian.woll...@zalando.de>
>> 
>> Tamara-Danz-Straße 1
>> 10243 Berlin
>> Fax: +49 (0)30 2759 46 93
>> E-mail: legalnot...@zalando.co.uk <mailto:legalnot...@zalando.co.uk>
>> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30 2000889349
>> 
>> Management Board:
>> Robert Gentz, David Schneider, Rubin Ritter
>> 
>> Chairman of the Supervisory Board:
>> Lothar Lanz
>> 
>> Person responsible for providing the contents of Zalando SE acc. to Art. 55 
>> RStV [Interstate Broadcasting Agreement]: Rubin Ritter
>> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
>> VAT registration number: DE 260543043
> 



Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Piotr Nowojski
Hi,

Are those paths:
file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
(missing)

accessible from the inside of your container? 

bin/flink run --help
(…)
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.

Other nit, maybe the problem is with single slash after “file:”. You have 
file:/home/...
While it might need to be
file://home/ ...

Piotrek

> On 3 Aug 2018, at 13:03, Joshua Fan  wrote:
> 
> Hi,
> 
> I'd like to submit a job with dependency jars by flink run, but it failed.
> 
> Here is the script,
> 
> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
> -c StreamExample \
> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
> ...
> xxx-1.0.jar
> 
> As described in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage
>  
> 
>  , "-C" means to provide the dependency jar.
> 
> After I execute the command, the job succeed to submit, but can not run in 
> flink cluster on yarn. Exceptions is like below:
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
> ClassLoader info: URL ClassLoader:
> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
> (missing)
> ...
> Class not resolvable through given classloader.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> 
> It appears that the two dependency jar cannot be found in TaskManager, so I 
> dig into the source code, from CliFrontend to PackagedProgram to 
> ClusterClient to JobGraph. It seems like the dependency jars is put in 
> classpath and userCodeClassLoader in PackagedProgram, but never upload to the 
> BlobServer in JobGraph where the xxx-1.0.jar is uploaded.
> 
> Am I missing something? In Flink 1.4.2, dependency jar is not supported?
> 
> Hope someone can give me some hint.
> 
> Appreciate it very mush.
> 
> 
> Yours Sincerely
> 
> Joshua
> 
> 
> 
> 



Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-03 Thread Piotr Nowojski
Hi,

 -yt,--yarnship  Ship files in the specified directory
  (t for transfer)

I guess that you even got a warning in your log files:

LOG.warn("Ship directory is not a directory. Ignoring it.”);

I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just 
`-yt /home/work/xxx/lib/` should suffice:
https://stackoverflow.com/a/47412643/8149051 
<https://stackoverflow.com/a/47412643/8149051>

Piotrek

> On 3 Aug 2018, at 14:41, Joshua Fan  wrote:
> 
> hi Piotr
> 
> I give up to use big c to do such a thing. Big c requires the value to be a 
> java URL, but the java URL only supports  
> file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it with 
> a hdfs location.
> 
> For yt option, I think I should do something more.
> 
> Yours
> Joshua
> 
> On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan  <mailto:joshuafat...@gmail.com>> wrote:
> Hi Piotr
> 
> I just tried the yt option, like your suggestion, change -C  
> file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  
> file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, 
> reporting exception "Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".
> 
> big c can submit the job but the job can not run in cluster on yarn, yt just 
> can not submit.
> 
> I am trying to change the  "-C  
> file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  
> hdfs://namenode1/home/work/xxx/lib/commons-math3-3.5.jar", but Clifrontend 
> error was caught.
> I am still on it now, will report it later.
> 
> Yours
> Joshua
> 
> On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi Joshua,
> 
> Please try (as Paul suggested) using:
> 
>  -yt,--yarnship  Ship files in the specified 
> directory
>   (t for transfer)
> 
> I guess `-yt /home/work/xxx` should solve your problem :)
> 
> Piotrek
> 
>> On 3 Aug 2018, at 13:54, Joshua Fan > <mailto:joshuafat...@gmail.com>> wrote:
>> 
>> Hi Piotr
>> 
>> Thank you for your advice. I submit the dependency jar from local machine, 
>> they does not exist in yarn container machine. Maybe I misunderstand the 
>> option big c, it can not do such a thing.
>> 
>> Joshua  
>> 
>> On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Are those paths:
>> file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
>> file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' 
>> (missing)
>> 
>> accessible from the inside of your container? 
>> 
>> bin/flink run --help
>> (…)
>>  -C,--classpath  Adds a URL to each user code
>>   classloader  on all nodes in the
>>   cluster. The paths must specify a
>>   protocol (e.g. file://) and be
>>   accessible on all nodes (e.g. by 
>> means
>>   of a NFS share). You can use this
>>   option multiple times for 
>> specifying
>>   more than one URL. The protocol 
>> must
>>   be supported by the {@link
>>   java.net.URLClassLoader}.
>> 
>> Other nit, maybe the problem is with single slash after “file:”. You have 
>> file:/home/...
>> While it might need to be
>> file://home/ <>...
>> 
>> Piotrek
>> 
>>> On 3 Aug 2018, at 13:03, Joshua Fan >> <mailto:joshuafat...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I'd like to submit a job with dependency jars by flink run, but it failed.
>>> 
>>> Here is the script,
>>> 
>>> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
>>> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
>>> -c StreamExample \
>>> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
>>> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
>>> ...
>>> xxx-1.0.jar
>>> 
>>> As described in 
>>> https://ci.apache.org/projects/flink/flink-doc

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

2018-08-06 Thread Piotr Nowojski
Hi,

I’m glad that you have found a solution to your problem :)

To shorten feedback you can/should test as much logic as possible using smaller 
unit tests and some small scale integration tests: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html>
 . Usually there is no need for starting up full Flink cluster and submitting 
your WIP job during development. You can do such end to end tests only once 
before committing/pushing/merging/deploying. 

Piotrek 

> On 6 Aug 2018, at 10:03, Joshua Fan  wrote:
> 
> Hi Piotr
> 
> Thank you for your kindly suggestion.
> 
> Yes, there was surely a warning when a path like file:// is set. I later set 
> the -yt to a directory, and the jars in the directory was uploaded to TM, but 
> the flink run command failed to submit the job because of 
> ClassNotFoundException.
> 
> I finally realize that flink just want the user to use a fat jar to submit 
> the jar and its dependency but not a dynamic way to upload dependency when 
> submitting.
> 
> It's right when I submit a job in production environment, but in test 
> environment, users may change the business logic many times, they do not want 
> to wait a long time(to make the fat jar using maven,to transfer it to a flink 
> client node, to run it, I have to admit it is a long time.) to test it in 
> flink.
> 
> It seems I have to find a way to shorten the time my users cost.
> 
> Yours Sincerely
> 
> Joshua
> 
> On Fri, Aug 3, 2018 at 9:08 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
>  -yt,--yarnship  Ship files in the specified directory
>   (t for transfer)
> 
> I guess that you even got a warning in your log files:
> 
> LOG.warn("Ship directory is not a directory. Ignoring it.”);
> 
> I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just 
> `-yt /home/work/xxx/lib/` should suffice:
> https://stackoverflow.com/a/47412643/8149051 
> <https://stackoverflow.com/a/47412643/8149051>
> 
> Piotrek
> 
> 
>> On 3 Aug 2018, at 14:41, Joshua Fan > <mailto:joshuafat...@gmail.com>> wrote:
>> 
>> hi Piotr
>> 
>> I give up to use big c to do such a thing. Big c requires the value to be a 
>> java URL, but the java URL only supports  
>> file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it 
>> with a hdfs location.
>> 
>> For yt option, I think I should do something more.
>> 
>> Yours
>> Joshua
>> 
>> On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan > <mailto:joshuafat...@gmail.com>> wrote:
>> Hi Piotr
>> 
>> I just tried the yt option, like your suggestion, change -C  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, 
>> reporting exception "Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".
>> 
>> big c can submit the job but the job can not run in cluster on yarn, yt just 
>> can not submit.
>> 
>> I am trying to change the  "-C  
>> file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  
>> hdfs://namenode1/home/ <>work/xxx/lib/commons-math3-3.5.jar", but 
>> Clifrontend error was caught.
>> I am still on it now, will report it later.
>> 
>> Yours
>> Joshua
>> 
>> On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi Joshua,
>> 
>> Please try (as Paul suggested) using:
>> 
>>  -yt,--yarnship  Ship files in the specified 
>> directory
>>   (t for transfer)
>> 
>> I guess `-yt /home/work/xxx` should solve your problem :)
>> 
>> Piotrek
>> 
>>> On 3 Aug 2018, at 13:54, Joshua Fan >> <mailto:joshuafat...@gmail.com>> wrote:
>>> 
>>> Hi Piotr
>>> 
>>> Thank you for your advice. I submit the dependency jar from local machine, 
>>> they does not exist in yarn container machine. Maybe I misunderstand the 
>>> option big c, it can not do such a thing.
>>> 
>>> Joshua  
>>> 
>>> On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski >> <mailto:pi...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> Are those paths:
>>> fi

Re: Heap Problem with Checkpoints

2018-08-09 Thread Piotr Nowojski
Hi,

Thanks for getting back with more information.

Apparently this is a known bug of JDK since 2003 and is still not resolved:
https://bugs.java.com/view_bug.do?bug_id=4872014 

https://bugs.java.com/view_bug.do?bug_id=6664633 


Code that is using this `deleteOnExit` is not part of a Flink, but an external 
library that we are using (hadoop-aws:2.8.x), so we can not fix it for them and 
this bug should be reported/forwarded to them (I have already done just that 
). More interesting 
S3AOutputStream is already manually deleting those files when they are not 
needed in `org.apache.hadoop.fs.s3a.S3AOutputStream#close`’s finally block:

} finally {
  if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
  }
  super.close();
}

But this doesn’t remove the entry from DeleteOnExitHook. 

From what I see in the code, flink-s3-fs-presto filesystem implantation that we 
provide doesn’t use deleteOnExit, so if you can switch to this filesystem it 
would solve the problem for you.

Piotrek

> On 9 Aug 2018, at 12:09, Ayush Verma  wrote:
> 
> Hello Piotr, I work with Fabian and have been investigating the memory leak
> associated with issues mentioned in this thread. I took a heap dump of our
> master node and noticed that there was >1gb (and growing) worth of entries
> in the set, /files/, in class *java.io.DeleteOnExitHook*.
> Almost all the strings in this set look like,
> /tmp/hadoop-root/s3a/output-*.tmp.
> 
> This means that the checkpointing code, which uploads the data to s3,
> maintains it in a temporary local file, which is supposed to be deleted on
> exit of the JVM. In our case, the checkpointing is quite heavy and because
> we have a long running flink cluster, it causes this /set/ to grow
> unbounded, eventually cause an OOM. Please see these images:
> 
>  
> 
>  
> 
> The culprit seems to be *org.apache.hadoop.fs.s3a.S3AOutputStream*, which
> in-turn, calls
> *org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite()*. If we
> follow the method call chain from there, we end up at
> *org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite()*, where we
> can see the temp file being created and the method deleteOnExit() being
> called.
> 
> Maybe instead of relying on *deleteOnExit()* we can keep track of these tmp
> files, and as soon as they are no longer required, delete them ourself.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Scala 2.12 Support

2018-08-16 Thread Piotr Nowojski
Hi,

Scala 2.12 support is high on our priority list and we hope to have it included 
for the 1.7 release (as you can see in the ticket itself), which should happen 
later this year.

Piotrek

> On 15 Aug 2018, at 17:59, Aaron Levin  wrote:
> 
> Hello!
> 
> I'm wondering if there is anywhere I can see Flink's roadmap for Scala 2.12 
> support. The last email I can find on the list for this was back in January, 
> and the FLINK-7811[0], the ticket asking for Scala 2.12 support, hasn't been 
> updated in a few months.
> 
> Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and 
> from what I can gather this was one of the main barrier for Flink supporting 
> Scala 2.12. Given this has been fixed, is there work in progress to support 
> Scala 2.12? Any updates on FLINK-7811?
> 
> Thanks for your help!
> 
> [0] https://issues.apache.org/jira/browse/FLINK-7811 
> 
> [1] https://issues.apache.org/jira/browse/SPARK-14540 
> 
> 
> Best,
> 
> Aaron Levin



Re: How to compare two window ?

2018-08-16 Thread Piotr Nowojski
Hi,

Could you rephrase your question? Maybe by posting some code examples?

Piotrek

> On 16 Aug 2018, at 08:26, 苗元君  wrote:
> 
> Hi, Flink guys, 
> U really to a quick release, it's fantastic ! 
> 
> I'v got a situation , 
> window 1 is time driven, slice is 1min, trigger is 1 count
> window 2 is count driven, slice is 3 count, trigger is 1count
> 
> 1. Then element is out of window1 and just right into window2. 
> For example if there is only 2 element, window2 will have none element.  
> how to build window like this ? 
>I try to use window1 by structure (window trigger evictor) then window2 
> structure(trigger evictor)
>I got element calculate just in window1 and window2 in the same time
> 
> 2.  I try to find ways to use SQL on AllWindowedStream but seem not working. 
> Can SQL Query use on a WINDOW ?
> 3.  How to compare these SQL result ?
> 
> 
> 
> 
> Thank U so much.
> 
> -- 
> Yuanjun Miao
> 



Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-16 Thread Piotr Nowojski
Hi,

You made a small mistake when restoring from state using test harness, that I 
myself have also done in the past. Problem is with an ordering of those calls:

result.open();
if (savedState != null) {
result.initializeState(savedState);
}

Open is supposed to be called after initializeState, and if you look into the 
code of AbstractStreamOperatorTestHarness#open, if it is called before 
initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error 
(test harness is not part of a Flink’s public api). I will try to fix this: 
https://issues.apache.org/jira/browse/FLINK-10159 


Piotrek

> On 16 Aug 2018, at 00:24, Ken Krugler  wrote:
> 
> Hi all,
> 
> It looks to me like the OperatorSubtaskState returned from 
> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
> had been registered via registerProcessingTimeTimer but had not yet fired 
> when the snapshot was saved.
> 
> Is this a known limitation of OneInputStreamOperatorTestHarness?
> 
> If not, is there anything special I need to do when setting up the test 
> harness to ensure that timers are saved?
> 
> Below is the unit test, which shows how the test harness is being set up and 
> run.
> 
> The TimerFunction used in this test does seem to be doing the right thing, as 
> using it in a simple job on a local Flink cluster works as expected when 
> creating & then restarting from a savepoint.
> 
> Thanks,
> 
> — Ken
> 
> ==
> TimerTest.java
> ==
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import static org.junit.Assert.assertTrue;
> 
> import java.util.ArrayList;
> import java.util.List;
> 
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
> import org.junit.Before;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
> 
> public class TimerTest {
> public static final Logger LOGGER = 
> LoggerFactory.getLogger(TimerTest.class);
> 
> private List _firedTimers = new ArrayList();
> 
> @Before
> public void setUp() throws Exception {
> }
> 
> @Test
> public void testTimerSaving() throws Throwable {
> 
> // This operator doesn't really do much at all, but the first element
> // it processes will create a timer for (timestamp+1).
> // Whenever that timer fires, it will create another timer for 
> // (timestamp+1).
> KeyedProcessOperator operator = 
> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
> 
> // Create a test harness from scratch
> OneInputStreamOperatorTestHarness testHarness = 
> makeTestHarness(operator, null);
> 
> // We begin at time zero
> testHarness.setProcessingTime(0);
> 
> // Process some elements, which should also create a timer for time 1.
> int inputs[] = new int[] {1, 2, 3};
> for (int input : inputs) {
> testHarness.processElement(new StreamRecord<>(input));
> }
> 
> // Force some time to pass, which should keep moving the timer ahead,
> // finally leaving it set for time 10.
> for (long i = 1; i < 10; i++) {
> testHarness.setProcessingTime(i);
> }
> 
> // Save the state, which we assume should include the timer we set for
> // time 10.
> OperatorSubtaskState savedState = 
> testHarness.snapshot(0L, testHarness.getProcessingTime());
> 
> // Close the first test harness
> testHarness.close();
> 
> // Create a new test harness using the saved state (which we assume
> // includes the timer for time 10).
> testHarness = makeTestHarness(operator, savedState);
> 
> // Force more time to pass, which should keep moving the timer ahead.
> for (long i = 10; i < 20; i++) {
> testHarness.setProcessingTime(i);
> }
> 
> // Close the second test harness and make sure all the timers we 
> expect
> // actually fired.
> testHarness.close();
> for (long i = 1; i < 20; i++) {
> 
> // TODO This expectation currently fails, since Timers 

Re: How to submit flink job on yarn by java code

2018-08-16 Thread Piotr Nowojski
Hi,

Is this path accessible on the container? If not, use some distributed file 
system, nfs or -yt —yarnship option of the cli.

Please also take a look at 
https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2...@mail.gmail.com%3E
 


Piotrek

> On 16 Aug 2018, at 11:05, spoon_lz <971066...@qq.com> wrote:
> 
> Sorry, I don't know why the code and error are not visible.
> The error is :
> The program finished with the following exception:
> 
> /org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at flink.SubmitDemo.submit(SubmitDemo.java:75)
>   at flink.SubmitDemo.main(SubmitDemo.java:50)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment. 
> Diagnostics from YARN: Application application_1526888270443_0090 failed 2
> times due to AM Container for appattempt_1526888270443_0090_02 exited
> with  exitCode: -1000
> For more detailed output, check application tracking
> page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
> click on links to logs of each attempt.
> Diagnostics: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> java.io.FileNotFoundException: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>   at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>   at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
>   at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
>   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
>   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1526888270443_0090
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
>   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
>   at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
>   ... 5 more/
> 
> and my code like :
> 
> /public class SubmitDemo {
> 
> 
>private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
>private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
>private static final String JAR_FILE =
> "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
> 
> 
>public static void main(String[] args) {
> 
>SubmitDemo demo = new SubmitDemo();
>demo.before();
>List parameters = new ArrayList<>();
>parameters.add("run");
>parameters.add("-d");
>parameters.add("-m");
>parameters.add("yarn-cluster");
>parameters.add("-ynm");
>parameters.add("lz_test_alone");
>parameters.add("-yn");
>parameters.add("4");
>parameters.add("-ytm");
>parameters.add("4096");
> 

Re: Standalone cluster instability

2018-08-16 Thread Piotr Nowojski
Hi,

I’m not aware of such rules of thumb. Memory consumption is highly application 
and workload specific. It depends on how much things you allocate in your user 
code and how much memory do you keep on state (in case of heap state backend). 
Basically just as with most java applications, you have to use trial and error 
method.

One good practice is to before any deployment, test your Flink application on a 
testing cluster, that is identical to production cluster, by (re)processing 
some of the production workload/backlog/data (in parallel to production 
cluster).

Piotrek 

> On 16 Aug 2018, at 13:23, Shailesh Jain  wrote:
> 
> Thank you for your help Piotrek.
> 
> I think it was a combination of a. other processes taking up available memory 
> and b. flink processes consuming all the memory allocated to them, that 
> resulted in kernel running out of memory.
> 
> Are there any heuristics or best practices which you (or anyone in the 
> community) recommend to benchmark memory requirements of a particular flink 
> job?
> 
> Thanks,
> Shailesh
> 
> 
> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Good that we are more or less on track with this problem :) But the problem 
> here is not that heap size is too small, bot that your kernel is running out 
> of memory and starts killing processes. Either:
> 
> 1. some other process is using the available memory 
> 2. Increase memory allocation on your machine/virtual machine/container/cgroup
> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
> memory buffer pool). Of course for any given job/state 
> size/configuration/cluster size there is some minimal reasonable memory size 
> that you have to assign to Flink, otherwise you will have poor performance 
> and/or constant garbage collections and/or you will start getting OOM errors 
> from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on 
> a different level).
> 
> Piotrek
> 
> 
>> On 14 Aug 2018, at 07:36, Shailesh Jain > <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> Thanks for your reply. I checked through the syslogs for that time, and I 
>> see this:
>> 
>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
>> process 2305 (java) score 468 or sacrifice child
>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 
>> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>> 
>> As you pointed out, kernel killed the task manager process.
>> 
>> If I had already set the max heap size for the JVM (to 3GB in this case), 
>> and the memory usage stats showed 2329MB being used 90 seconds earlier, it 
>> seems a bit unlikely for operators to consume 700 MB heap space in that 
>> short time, because our events ingestion rate is not that high (close to 10 
>> events per minute).
>> 
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage 
>> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB 
>> (used/committed/max)]
>> 
>> Is it possible to log individual operator's memory consumption? This would 
>> help in narrowing down on the root cause. There were around 50 operators 
>> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP 
>> operators).
>> 
>> Thanks,
>> Shailesh
>> 
>> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Please post full TaskManager logs, including stderr and stdout. (Have you 
>> checked the stderr/stdout for some messages?)
>> 
>> I could think of couple reasons:
>> 1. process segfault
>> 2. process killed by OS
>> 3. OS failure
>> 
>> 1. Should be visible by some message in stderr/stdout file and can be caused 
>> by for example JVM, RocksDB or some other native library/code bug. 
>> 2. Is your system maybe running out of memory? Kernel might kill process if 
>> that’s happening. You can also check system (linux?) logs for errors that 
>> correlate in time. Where are those logs depend on your OS. 
>> 3. This might be tricky, but I have seen kernel failures that prevented any 
>> messages from being logged for example. Besides this TaskManager failure is 
>> your machine operating normally without any other problems/crashes/restarts?
>> 
>> Piotrek
>> 
>>> On 10 Aug 2018, at 06:59, Shailesh Jain >> <mailto:shailesh.j...@stellapps.com>> wrote:
>>> 
>>> Hi,
>>> 
>&

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-17 Thread Piotr Nowojski
No problem :) You motivated me to do a fix for that, since I stumbled across 
this bug/issue myself before and also took me some time in the debugger to find 
the cause.

Piotrek

> On 16 Aug 2018, at 20:05, Ken Krugler  wrote:
> 
> Hi Piotr,
> 
> Thanks, and darn it that’s something I should have noticed.
> 
> — Ken
> 
> 
>> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> You made a small mistake when restoring from state using test harness, that 
>> I myself have also done in the past. Problem is with an ordering of those 
>> calls:
>> 
>> result.open();
>> if (savedState != null) {
>> result.initializeState(savedState);
>> }
>> 
>> Open is supposed to be called after initializeState, and if you look into 
>> the code of AbstractStreamOperatorTestHarness#open, if it is called before 
>> initialize, it will initialize harness without any state.
>> 
>> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
>> (test harness is not part of a Flink’s public api). I will try to fix this: 
>> https://issues.apache.org/jira/browse/FLINK-10159 
>> <https://issues.apache.org/jira/browse/FLINK-10159>
>> 
>> Piotrek
>> 
>>> On 16 Aug 2018, at 00:24, Ken Krugler >> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> It looks to me like the OperatorSubtaskState returned from 
>>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>>> had been registered via registerProcessingTimeTimer but had not yet fired 
>>> when the snapshot was saved.
>>> 
>>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>>> 
>>> If not, is there anything special I need to do when setting up the test 
>>> harness to ensure that timers are saved?
>>> 
>>> Below is the unit test, which shows how the test harness is being set up 
>>> and run.
>>> 
>>> The TimerFunction used in this test does seem to be doing the right thing, 
>>> as using it in a simple job on a local Flink cluster works as expected when 
>>> creating & then restarting from a savepoint.
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>>> ==
>>> TimerTest.java
>>> ==
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import static org.junit.Assert.assertTrue;
>>> 
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> 
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import 
>>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>>> 
>>> public class TimerTest {
>>> public static final Logger LOGGER = 
>>> LoggerFactory.getLogger(TimerTest.class);
>>> 
>>> private List _firedTimers = new ArrayList();
>>> 
>>> @Before
>>> public void setUp() throws Exception {
>>> }
>>> 
>>> @Test
>>> public void testTimerSaving() throws Throwable {
>>> 
>>> // This operator doesn't really do much at all, but the first 
>>> element
>>> // it processes will create a timer for (timestamp+1).
>>> // Whenever that timer fires, it will create another timer for 
>>> // (timestamp+1).
>>> KeyedProcessOperator operator = 
>>> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>> 
>>> // Create a test harness from scratch
>>> OneInputStreamOperatorTestHarness testHarness = 
>>> makeTestHarness(operator

Re: Standalone cluster instability

2018-08-10 Thread Piotr Nowojski
ersion 1.4.2.
> 
> Thanks,
> Shailesh
> 
> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov 
> mailto:alexander.smirn...@gmail.com>> wrote:
> Hi Piotr,
> 
> I didn't find anything special in the logs before the failure. 
> Here are the logs, please take a look:
> 
> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing
>  
> <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
> 
> The configuration is:
> 
> 3 task managers:
> qafdsflinkw011.scl 
> qafdsflinkw012.scl 
> qafdsflinkw013.scl - lost connection
> 
> 3 job  managers:
> qafdsflinkm011.scl - the leader
> qafdsflinkm012.scl 
> qafdsflinkm013.scl 
> 
> 3 zookeepers:
> qafdsflinkzk011.scl
> qafdsflinkzk012.scl
> qafdsflinkzk013.scl
> 
> Thank you,
> Alex
> 
> 
> 
> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Does the issue really happen after 48 hours? 
> Is there some indication of a failure in TaskManager log?
> 
> If you will be still unable to solve the problem, please provide full 
> TaskManager and JobManager logs.
> 
> Piotrek
> 
>> On 21 Mar 2018, at 16:00, Alexander Smirnov > <mailto:alexander.smirn...@gmail.com>> wrote:
>> 
>> One more question - I see a lot of line like the following in the logs
>> 
>> [2018-03-21 00:30:35,975] ERROR Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] 
>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,208] WARN Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:41068 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is 
>> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
>> gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,235] WARN Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40677 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is 
>> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
>> gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40382 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is 
>> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
>> gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:44744 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is 
>> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
>> gating instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,266] WARN Association to 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
>> <http://fl...@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is 
>> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
>> gating instead for 5000 ms. (akka.remote.Remoting)
>> 
>> 
>> The host is available, but I don't understand where port number comes from. 
>> Task Manager uses another port (which is printed in logs on startup)
>> Could you please help to understand why it happens?
>> 
>> Thank you,
>> Alex
>> 
>> 
>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov 
>> mailto:alexander.smirn...@gmail.com>> wrote:
>> Hello,
>> 
>> I've assembled a standalone cluster of 3 task managers and 3 job 
>> managers(and 3 ZK) following the instructions at 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html>
>>  and 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>> 
>> It works ok, but randomly, task managers becomes unavailable. JobManager has 
>> exception like below in logs:
>> 
>> 
>> [2018-03-19 00:33:10,211] WARN Association with remote system 
>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
>> <http://fl...@qa

Re: Standalone cluster instability

2018-08-14 Thread Piotr Nowojski
Hi,

Good that we are more or less on track with this problem :) But the problem 
here is not that heap size is too small, bot that your kernel is running out of 
memory and starts killing processes. Either:

1. some other process is using the available memory 
2. Increase memory allocation on your machine/virtual machine/container/cgroup
3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
memory buffer pool). Of course for any given job/state 
size/configuration/cluster size there is some minimal reasonable memory size 
that you have to assign to Flink, otherwise you will have poor performance 
and/or constant garbage collections and/or you will start getting OOM errors 
from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a 
different level).

Piotrek

> On 14 Aug 2018, at 07:36, Shailesh Jain  wrote:
> 
> Hi Piotrek,
> 
> Thanks for your reply. I checked through the syslogs for that time, and I see 
> this:
> 
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
> process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) 
> total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
> 
> As you pointed out, kernel killed the task manager process.
> 
> If I had already set the max heap size for the JVM (to 3GB in this case), and 
> the memory usage stats showed 2329MB being used 90 seconds earlier, it seems 
> a bit unlikely for operators to consume 700 MB heap space in that short time, 
> because our events ingestion rate is not that high (close to 10 events per 
> minute).
> 
> 2018-08-08 13:19:23,341 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage 
> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> 
> Is it possible to log individual operator's memory consumption? This would 
> help in narrowing down on the root cause. There were around 50 operators 
> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP 
> operators).
> 
> Thanks,
> Shailesh
> 
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Please post full TaskManager logs, including stderr and stdout. (Have you 
> checked the stderr/stdout for some messages?)
> 
> I could think of couple reasons:
> 1. process segfault
> 2. process killed by OS
> 3. OS failure
> 
> 1. Should be visible by some message in stderr/stdout file and can be caused 
> by for example JVM, RocksDB or some other native library/code bug. 
> 2. Is your system maybe running out of memory? Kernel might kill process if 
> that’s happening. You can also check system (linux?) logs for errors that 
> correlate in time. Where are those logs depend on your OS. 
> 3. This might be tricky, but I have seen kernel failures that prevented any 
> messages from being logged for example. Besides this TaskManager failure is 
> your machine operating normally without any other problems/crashes/restarts?
> 
> Piotrek
> 
>> On 10 Aug 2018, at 06:59, Shailesh Jain > <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> Hi,
>> 
>> I hit a similar issue yesterday, the task manager died suspiciously, no 
>> error logs in the task manager logs, but I see the following exceptions in 
>> the job manager logs:
>> 
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting   
>>- Association to [akka.tcp://flink@localhost:34483 <>] with 
>> UID [328996232] irrecoverably failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for too 
>> long. (more than 48.0 hours)
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at 
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.fork

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Piotr Nowojski
  - locked <0x7f4b5488f2b8> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

> On 16 Jul 2018, at 17:03, Gerard Garcia  <mailto:ger...@talaia.io>> wrote:
> 
> Hi Piotr,
> 
> I attach the GC pauses logged a while back when the task stopped processing 
> during several hours (it stopped at about 20:05) and a jstack dump from the 
> last time the task hanged. 
> 
> Thanks,
> 
> Gerard
> 
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi Gerard,
> 
> I second to what Zhijiang wrote. Please check GC pauses, either via GC 
> logging, 3rd party tool like jconsole (or some memory profiler) or via 
> enabling resource logging in Flink. 
> 
> After confirming that this is not the issue next time this happens, instead 
> of cancelling the job, please collect thread dumps on a process that is stuck.
> 
> Piotrek  
> 
>> On 16 Jul 2018, at 13:53, Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Gerard,
>> 
>> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been 
>> working on the networking stack lately and might have some ideas regarding 
>> your issue.
>> 
>> Best, Fabian
>> 
>> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) 
>> mailto:wangzhijiang...@aliyun.com>>:
>> Hi Gerard,
>> 
>> I thought the failed task triggers cancel process before, now I am clear 
>> that you cancel the task when it stops processing data.
>> I think you can jstack the process to find where task thread is blocked 
>> instead of canceling it, then we may find some hints.
>> 
>> In addition, the following stack "DataOutputSerializer.resize" indicates the 
>> task is serializing the record and there will be overhead byte buffers in 
>> the serializer for copying data temporarily. And if your record is too 
>> large, it may cause OOM in this process and this overhead memory is not 
>> managed by flink framework. Also you can monitor the gc status to check the 
>> full gc delay.
>> 
>> Best,
>> Zhijiang
>> --
>> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
>> 发送时间:2018年7月13日(星期五) 16:22
>> 收件人:wangzhijiang999 > <mailto:wangzhijiang...@aliyun.com>>
>> 抄 送:user mailto:user@flink.apache.org>>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>> 
>> Hi Zhijiang,
>> 
>> The problem is that no other task failed first. We have a task that 
>> sometimes just stops processing data, and when we cancel it, we see the logs 
>> messages  saying:
>> 
>> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck 
>> in method: 
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>  
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>>  org.apache.flink.types.StringValue.writeString(StringValue.java:802)
>> (...)"
>> 
>> That is why we suspect that it hangs forever at that point and that is why 
>> it stops processing data. I don;t see any increase in memory use in the heap 
>> (I guess because these buffers are managed by Flink) so I'm not sure if that 
>> is really the problem.
>> 
>> Gerard
>> 
>> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
>> mailto:wangzhijiang...@aliyun.com>> wrote:
>> Hi Gerard,
>> 
>> I think you can check the job manager log to find which task failed at 
>> first, and then trace the task manager log containing the failed task to 
>> find the initial reason.
>> The failed task will trigger canceling all the other tasks, and during 
>> canceling process, the blocked task that is waiting for output buffer can 
>> not be interrupted by the
>> canceler thread which is shown in your description. So I think the cancel 
>> process is not the key point and is in expectation. Maybe it did not cause 
>> OOM at all.
>> If the taskduring canceling, the task manager process will be exited finally 
>> to trigger restarting the job.
>> 
>> Zhijiang
>> --
>> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
>> 发送时间:201

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-16 Thread Piotr Nowojski
Hi Gerard,

I second to what Zhijiang wrote. Please check GC pauses, either via GC logging, 
3rd party tool like jconsole (or some memory profiler) or via enabling resource 
logging in Flink. 

After confirming that this is not the issue next time this happens, instead of 
cancelling the job, please collect thread dumps on a process that is stuck.

Piotrek  

> On 16 Jul 2018, at 13:53, Fabian Hueske  wrote:
> 
> Hi Gerard,
> 
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been 
> working on the networking stack lately and might have some ideas regarding 
> your issue.
> 
> Best, Fabian
> 
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>>:
> Hi Gerard,
> 
> I thought the failed task triggers cancel process before, now I am clear that 
> you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked 
> instead of canceling it, then we may find some hints.
> 
> In addition, the following stack "DataOutputSerializer.resize" indicates the 
> task is serializing the record and there will be overhead byte buffers in the 
> serializer for copying data temporarily. And if your record is too large, it 
> may cause OOM in this process and this overhead memory is not managed by 
> flink framework. Also you can monitor the gc status to check the full gc 
> delay.
> 
> Best,
> Zhijiang
> --
> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999  >
> 抄 送:user mailto:user@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Hi Zhijiang,
> 
> The problem is that no other task failed first. We have a task that sometimes 
> just stops processing data, and when we cancel it, we see the logs messages  
> saying:
> 
> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck 
> in method: 
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>  
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>  org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
> 
> That is why we suspect that it hangs forever at that point and that is why it 
> stops processing data. I don;t see any increase in memory use in the heap (I 
> guess because these buffers are managed by Flink) so I'm not sure if that is 
> really the problem.
> 
> Gerard
> 
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>> wrote:
> Hi Gerard,
> 
> I think you can check the job manager log to find which task failed at first, 
> and then trace the task manager log containing the failed task to find the 
> initial reason.
> The failed task will trigger canceling all the other tasks, and during 
> canceling process, the blocked task that is waiting for output buffer can not 
> be interrupted by the
> canceler thread which is shown in your description. So I think the cancel 
> process is not the key point and is in expectation. Maybe it did not cause 
> OOM at all.
> If the taskduring canceling, the task manager process will be exited finally 
> to trigger restarting the job.
> 
> Zhijiang
> --
> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999  >
> 抄 送:user mailto:user@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Thanks Zhijiang,
> 
> We haven't found any other relevant log messages anywhere. These traces 
> belong to the unresponsive task, that is why we suspect that at some point it 
> did not have enough memory to serialize the message and it blocked. I've also 
> found that when it hanged several output buffers were full (see attached 
> image buffers.outPoolUsage.png) so I guess the traces just reflect that.
> 
> Probably the task hanged for some other reason and that is what filled the 
> output buffers previous to the blocked operator. I'll have to continue 
> investigating to find the real cause.
> 
> Gerard
> 
> 
> 
> 
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>> wrote:
>  Hi Gerard,
> 
> From the below stack, it can only indicate the task is canceled that may 
> be triggered by job manager becuase of other task failure. If the task can 
> not be interrupted within timeout config, the task managerprocess will be 
> exited. Do you see any OutOfMemory messages from the task manager log?  
> Normally the ouput serialization buffer is managed by task manager framework 
> and will not cause OOM, and on the input desearialization side, there will be 
> a temp bytes array on each channel for holding partial records which is not 
> managed by framework. I 

Re: Extremely large job serialization produced by union operator

2018-03-09 Thread Piotr Nowojski
Hi,

Could you provide more details about your queries and setup? Logs could be 
helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力  wrote:
> 
> I wrote a flink-sql app with following topography.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> 
> I have a dozen of TableSources And tens of SQLs. As a result, the number of 
> JDBCAppendTableSink times parallelism, that is the number of concurrent 
> connections to database, is too large for the database server to handle. So I 
> tried union DataStreams before connecting them to the TableSink.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> 
> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> 
> With this strategy, job submission failed with an OversizedPayloadException 
> of 104 MB. Increasing akka.framesize helps to avoid this exception, but job 
> submission hangs and times out.
> 
> I can't understand why a simple union operator would serialize to such a 
> large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
> 
> Regards,
> Bill



Re: Slow Flink program

2018-03-01 Thread Piotr Nowojski
Hi,

First of all learn about what’s going with your job: check the status of the 
machines, cpu/network usage on the cluster. If CPU is not ~100%, analyse what 
is preventing the machines to work faster (network bottleneck, locking, 
blocking operations etc). If CPU is ~100%, profile the TaskManagers to see what 
can you speed up.

In your example couple of questions:
- you create CollectiveData instances with size 128000 by default. Doesn’t it 
mean that your records are gigantic? I can not tell, since you didn’t provide 
full code.
- you are mapping the data to new Tuple2(0, s);  and 
then keying by the first field, which is always 0. Probably all of the records 
are ending up on one single machine 

Piotrek

> On 28 Feb 2018, at 17:20, Supun Kamburugamuve  wrote:
> 
> Hi, 
> 
> I'm trying to run a simple benchmark on Flink streaming reduce. It seems it 
> is very slow. Could you let me know if I'm doing something wrong.
> 
> Here is the program. I'm running this on 32 nodes with 20 tasks in each node. 
> So the parallelism is at 640.
> 
> public class StreamingReduce {
>   int size;
>   int iterations;
>   StreamExecutionEnvironment env;
>   String outFile;
> 
>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment 
> env, String outFile) {
> this.size = size;
> this.iterations = iterations;
> this.env = env;
> this.outFile = outFile;
>   }
> 
>   public void execute() {
> DataStream stringStream = env.addSource(new 
> RichParallelSourceFunction() {
>   int i = 1;
>   int count = 0;
>   int size = 0;
>   int iterations = 1;
> 
>   @Override
>   public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ParameterTool p = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> size = p.getInt("size", 128000);
> iterations = p.getInt("itr", 1);
> System.out.println(" iterations: " + iterations + " size: " + 
> size);
>   }
> 
>   @Override
>   public void run(SourceContext sourceContext) throws 
> Exception {
> while (count < iterations) {
>   CollectiveData i = new CollectiveData(size);
>   sourceContext.collect(i);
>   count++;
> }
>   }
> 
>   @Override
>   public void cancel() {
>   }
> });
> 
> stringStream.map(new RichMapFunction CollectiveData>>() {
>   @Override
>   public Tuple2 map(CollectiveData s) throws 
> Exception {
> return new Tuple2(0, s);
>   }
> }).keyBy(0).reduce(new ReduceFunction>() {
>   @Override
>   public Tuple2 reduce(Tuple2 CollectiveData> c1,
> Tuple2 CollectiveData> c2) throws Exception {
> return new Tuple2(0, add(c1.f1, c2.f1));
>   }
> }).addSink(new RichSinkFunction>() {
>   long start;
>   int count = 0;
>   int iterations;
> 
>   @Override
>   public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ParameterTool p = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> iterations = p.getInt("itr", 1);
> System.out.println(" iterations: " + iterations);
>   }
> 
>   @Override
>   public void invoke(Tuple2 integerStringTuple2) 
> throws Exception {
> if (count == 0) {
>   start = System.nanoTime();
> }
> count++;
> if (count >= iterations) {
>   System.out.println("Final: " + count + " " + (System.nanoTime() - 
> start) / 100 + " " + (integerStringTuple2.f1));
> }
>   }
> });
> 
>   }
> 
>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
> List r= new ArrayList<>();
> for (int k = 0; k < i.getList().size(); k++) {
>   r.add((i.getList().get(k) + j.getList().get(k)));
> }
> return new CollectiveData(r);
>   }
> }
> Thanks,
> Supun..
> 
> 



Re: Hi Flink Team

2018-03-01 Thread Piotr Nowojski
Hi,

timeWindowAll is a non parallel operation, since it gathers all of the elements 
and process them together:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#timeWindowAll-org.apache.flink.streaming.api.windowing.time.Time-org.apache.flink.streaming.api.windowing.time.Time-
 


Note that it’s defined in DataStream, not in the KeyedStream.

In your keyBy example keyBy() is just a NoOp. Didn’t you mean to use 
KeyedStream#timeWindows method?

Piotrek

> On 1 Mar 2018, at 09:21, Ashish Attarde  wrote:
> 
> Hi,
> 
> I am new to Flink and in general data processing using stream processors.
> 
> I am using flink to do real time correlation between multiple records which 
> are coming as part of same stream. I am doing is "apply" operation on 
> TimeWindowed stream. When I submit job with parallelism factor of 4, I am 
> still seeing apply operation is applied with parallelism factor of 1.
> 
> Here is the peice of code :
> 
> parsedInput.keyBy("mflowHash")
> .timeWindowAll(Time.milliseconds(1000),Time.milliseconds(200))
> .allowedLateness(Time.seconds(10))
> .apply(new CRWindow());
> 
> I am trying to correlate 2 streams, what is the right way to do it? I tried 
> the CEP library and experienced the worst performance. It is taking ~4 
> minutes to do the correlation. The corelation logic is very simple and not 
> compute intensive.
> 
> 
> -- 
> 
> Thanks
> -Ashish Attarde
> 
> 
> 
> -- 
> 
> Thanks
> -Ashish Attarde



Re: Too many open files on Bucketing sink

2018-03-15 Thread Piotr Nowojski
Hi,

There is an open similar issue: 
https://issues.apache.org/jira/browse/FLINK-8707 


It’s still under investigation and it would be helpful if you could follow up 
the discussion there, run same diagnostics commands as Alexander Gardner did 
(mainly if you could attach output of lsof command for TaskManagers).

Last time I was looking into it, most of the open files came from loading 
dependency jars for the operators. It seemed like each task/task slot was 
executed in separate class loader so the same dependency was being loaded 
multiple times over and over again.

Thanks, Piotrek

> On 14 Mar 2018, at 19:52, Felix Cheung  wrote:
> 
> I have seen this before as well.
> 
> My workaround was to limit the number of parallelism but it is the 
> unfortunate effect of limiting the number of processing tasks also (and so 
> slowing things down)
> 
> Another alternative is to have bigger buckets (and smaller number of buckets)
> 
> Not sure if there is a good solution.
> 
> From: galantaa 
> Sent: Tuesday, March 13, 2018 7:08:01 AM
> To: user@flink.apache.org
> Subject: Too many open files on Bucketing sink
>  
> Hey all,
> I'm using bucketing sink with a bucketer that creates partition per customer
> per day.
> I sink the files to s3.
> it suppose to work on around 500 files at the same time (according to my
> partitioning).
> 
> I have a critical problem of 'Too many open files'.
> I've upload two taskmanagers, each with 16 slots. I've checked how many open
> files (or file descriptors) exist with 'lsof | wc -l' and it had reached
> over a million files on each taskmanager!
> 
> after that, I'd decreased the num of taskSlots to 8 (4 in each taskmanager),
> and the concurrency dropped.
> checking 'lsof | wc -l' gave around 250k file on each machine. 
> I also checked how many actual files exist in my tmp dir (it works on the
> files there before uploading them to s3) - around 3000.
> 
> I think that each taskSlot works with several threads (maybe 16?), and each
> thread holds a fd for the actual file, and thats how the numbers get so
> high.
> 
> Is that a know problem? is there anything I can do?
> by now, I filter just 10 customers and it works great, but I have to find a
> real solution so I can stream all the data.
> Maybe I can also work with a single task Slot per machine but I'm not sure
> this is a good idea.
> 
> Thank you very much,
> Alon 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 


Re: Emulate Tumbling window in Event Time Space

2018-03-09 Thread Piotr Nowojski
Hi,

As Xingcan responded, you could use already built in operator for that. 

If you really want to implement something on your own (need custom feature? For 
fun?), you would have to implement some variation of a InternalTimerService 
from Flink (you can browse the code for an inspiration). On each processed 
element you have to keep updating state of your in memory/in state windows with 
timestamps marking when they should be triggered. Then on each processed 
watermark in your operator you need to trigger/fire windows matching to the 
processed watermark.

Piotrek

> On 9 Mar 2018, at 07:50, Xingcan Cui  wrote:
> 
> Hi Dhruv,
> 
> there’s no need to implement the window logic with the low-level 
> `ProcessFunction` yourself. Flink has provided built-in window operators and 
> you just need to implement the `WindowFunction` for that [1].
> 
> Best,
> Xingcan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#window-functions
>  
> 
> 
>> On 9 Mar 2018, at 1:51 PM, Dhruv Kumar > > wrote:
>> 
>> Hi
>> 
>> I was trying to emulate tumbling window in event time space. Here 
>> 
>>  is the link to my code.
>> I am using the process function to do the custom processing which I want to 
>> do within every window. I am having an issue of how to emit results at the 
>> end of every window since my watermark only gets emitted at every incoming 
>> event (incoming event will mostly not intersect with the end time of any 
>> window). Seems like I need to add a trigger somewhere which fires at the end 
>> of every window. Could any one here help me? Sorry, if I am not clear in 
>> anything. I am quite new to Flink. 
>> 
>> Thanks
>> Dhruv
> 



Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

2018-03-09 Thread Piotr Nowojski
Hi,

Short answer is: no, at the moment clean shutdown is not implemented for the 
streaming, but it’s on our to do list for the future.

Hacky answer: you could implement some custom code, that would wait for at 
least one completed checkpoint after the last input data. But that would 
require modifying a source function or at least wrapping it and there might be 
some corner cases that I haven’t thought about.

Piotrek

> On 9 Mar 2018, at 14:49, Niels van Kaam  wrote:
> 
> Hi,
> 
> I'm working on a custom implementation of a sink which I would like to use 
> with exactly once semantics. Therefore I have implemented the 
> TwoPhaseCommitSinkFunction class as mentioned in this recent post: 
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>  
> 
> 
> I have some integration tests which run jobs using the custom sink with a 
> finite dataset (A RichSourceFunction with a "finite" run method). The tests 
> fail because of missing data. I noticed that is due to the last transaction 
> being aborted.
> 
> When looking into the source code that makes sense because the close() 
> implementation of TwoPhaseCommitSinkFunction calls abort on the current 
> transaction: 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
>  
> 
>  
> 
> I could override this behaviour and perform a commit, but then I would 
> perform a commit without getting the checkpoint completed notification, thus 
> not properly maintaining exactly once guarantees
> 
> Is (and how is) it possible to have end-to-end exactly once guarantees when 
> dealing with (sometimes) finite jobs?
> 
> Thanks!
> Niels
> 



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-05 Thread Piotr Nowojski
gt; 2018-04-05 22:37:29,545 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - Trying to associate with JobManager leader 
> akka.tcp://flink@localhost:6123/user/jobmanager
> 2018-04-05 22:37:29,552 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager#-853250886] - leader session 
> ----
> 2018-04-05 22:37:30,495 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - TaskManager f0b0370186ab3c865db63fe60ca68e08 has started.
> 2018-04-05 22:37:30,497 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered 
> TaskManager at 192.168.0.26 
> (akka.tcp://flink@mb-sr-asmirnov.local:60696/user/taskmanager) as 
> 2972a72a7223e63bb5a4fedd159c0b78. Current number of registered hosts is 1. 
> Current number of alive task slots is 1.
> 2018-04-05 22:38:29,355 INFO  org.apache.flink.runtime.client.JobClient   
>   - Checking and uploading JAR files
> 2018-04-05 22:38:29,639 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job 43ecfe9cb258b7f624aad9868d306edb (Failed job).
> 2018-04-05 22:38:29,643 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=1) for 43ecfe9cb258b7f624aad9868d306edb.
> 2018-04-05 22:38:29,656 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 
> 
> 
> On Thu, Apr 5, 2018 at 10:35 PM Alexander Smirnov 
> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>> wrote:
> Hi Piotr,
> 
> I'm using Flink 1.4.2
> 
> it's a standard flink distribution downloaded and unpacked.
> 
> added the following lines to conf/flink-conf.yaml:
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
> 
> created new java project as described at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html>
> 
> here's the code:
> 
> public class FailedJob
> {
> static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
> 
> public static void main( String[] args ) throws Exception
> {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> env.enableCheckpointing(5000,
> CheckpointingMode.EXACTLY_ONCE);
> 
> DataStream stream = env.fromCollection(Arrays.asList("test"));
> 
> stream.map(new MapFunction<String, String>(){
> @Override
> public String map(String obj) {
> throw new NullPointerException("NPE");
> } 
> });
> 
> env.execute("Failed job");
> }
> }
> 
> attaching screenshots, please let me know if more info is needed
> 
> Alex
> 
> 
>  
> 
> On Thu, Apr 5, 2018 at 5:35 PM Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Can you provide more details, like post your configuration/log files/screen 
> shots from web UI and Flink version being used?
> 
> Piotrek
> 
> > On 5 Apr 2018, at 06:07, Alexander Smirnov <alexander.smirn...@gmail.com 
> > <mailto:alexander.smirn...@gmail.com>> wrote:
> >
> > Hello,
> >
> > I've defined restart strategy in flink-conf.yaml as none. WebUI / Job 
> > Manager section confirms that.
> > But looks like this setting is disregarded.
> >
> > When I go into job's configuration in the WebUI, in the Execution 
> > Configuration section I can see:
> > Max. number of execution retries  Restart with fixed delay 
> > (1 ms). #2147483647 <tel:(214)%20748-3647> restart attempts.
> >
> > Do you think it is a bug?
> >
> > Alex
> 



Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Piotr Nowojski
Hi,

If I’m not mistaken this is a known issue, that we were working to resolve for 
Flink 1.5 release. The problem is that with back pressure, data are being 
buffered between nodes and on checkpoint, all of those data must be processed 
before checkpoint can be completed. This is especially problematic if 
processing a single record takes/can take significant amount of time. 

With Flink 1.5 we introduced mechanism to better control the amount of buffered 
data and it should address this issue (Flink 1.5 should be released within 
couple of weeks).

In the mean time, you could try out Flink 1.5 release candidate that has been 
just published or you could try to reduce the number of configured network 
buffers, however keep in mind that at some point this can decrease your maximal 
throughput:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>

On the other hand, why does it prevents you from using a checkpointing at all? 

Piotr Nowojski 

> On 5 Apr 2018, at 06:10, Edward <egb...@hotmail.com> wrote:
> 
> I read through this thread and didn't see any resolution to the slow
> checkpoint issue (just that someone resolved their backpressure issue).
> 
> We are experiencing the same problem: 
> - When there is no backpressure, checkpoints take less than 100ms
> - When there is high backpressure, checkpoints take anywhere from 5 minutes
> to 25 minutes.
> 
> This is preventing us from using the checkpointing feature at all, since
> periodic backpressure is unavoidable.
> 
> We are experiencing this when running on Flink 1.4.0.
> We are retaining only a single checkpoint, and the size of retained
> checkpoint is less than 250KB, so there's not a lot of state.
>   state.backend: jobmanager
>   state.backend.async: true
>   state.backend.fs.checkpointdir: hdfs://checkpoints
>   state.checkpoints.num-retained: 1
>   max concurrent checkpoints: 1
>   checkpointing mode: AT_LEAST_ONCE
> 
> One other data point: if I rewrite the job to allow chaining all steps (i.e.
> same parallelism on all steps, so they fit in 1 task slot), the checkpoints
> are still slow under backpressure, but are an order of magnitude faster --
> they take about 60 seconds rather than 15 minutes.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-05 Thread Piotr Nowojski
Hi,

Can you provide more details, like post your configuration/log files/screen 
shots from web UI and Flink version being used?

Piotrek

> On 5 Apr 2018, at 06:07, Alexander Smirnov  
> wrote:
> 
> Hello,
> 
> I've defined restart strategy in flink-conf.yaml as none. WebUI / Job Manager 
> section confirms that.
> But looks like this setting is disregarded.
> 
> When I go into job's configuration in the WebUI, in the Execution 
> Configuration section I can see:
> Max. number of execution retries  Restart with fixed delay (1 
> ms). #2147483647 restart attempts.
> 
> Do you think it is a bug?
> 
> Alex



Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Piotr Nowojski
Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it 
doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the 
problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward  wrote:
> 
> Thanks for the update Piotr.
> 
> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).
> 
> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience 
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-06 Thread Piotr Nowojski
Thanks!

> On 6 Apr 2018, at 00:30, Alexander Smirnov <alexander.smirn...@gmail.com> 
> wrote:
> 
> Thanks Piotr,
> 
> I've created a JIRA issue to track it: 
> https://issues.apache.org/jira/browse/FLINK-9143 
> <https://issues.apache.org/jira/browse/FLINK-9143>
> 
> Alex
> 
> 
> On Thu, Apr 5, 2018 at 11:28 PM Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Thanks for the details! I can confirm this behaviour. flink-conf.yaml 
> restart-strategy value is being completely ignored (regardless of it’s value) 
> when user enables checkpointing:
> 
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> 
> I suspect this is a bug, but I have to confirm it.
> 
> Thanks, Piotrek
> 
>> On 5 Apr 2018, at 12:40, Alexander Smirnov <alexander.smirn...@gmail.com 
>> <mailto:alexander.smirn...@gmail.com>> wrote:
>> 
>> jobmanager.log:
>> 
>> 2018-04-05 22:37:28,348 INFO  
>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>> configuration property: restart-strategy, none
>> 2018-04-05 22:37:28,353 INFO  org.apache.flink.core.fs.FileSystem
>>- Hadoop is not in the classpath/dependencies. The extended 
>> set of supported File Systems via Hadoop is not available.
>> 2018-04-05 22:37:28,506 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager without high-availability
>> 2018-04-05 22:37:28,510 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager on localhost:6123 with execution mode 
>> CLUSTER
>> 2018-04-05 22:37:28,517 INFO  
>> org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot 
>> create Hadoop Security Module because Hadoop cannot be found in the 
>> Classpath.
>> 2018-04-05 22:37:28,546 INFO  
>> org.apache.flink.runtime.security.SecurityUtils   - Cannot 
>> install HadoopSecurityContext because Hadoop cannot be found in the 
>> Classpath.
>> 2018-04-05 22:37:28,591 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Trying to start actor system at localhost:6123
>> 2018-04-05 22:37:28,981 INFO  akka.event.slf4j.Slf4jLogger   
>>- Slf4jLogger started
>> 2018-04-05 22:37:29,027 INFO  akka.remote.Remoting   
>>- Starting remoting
>> 2018-04-05 22:37:29,129 INFO  akka.remote.Remoting   
>>- Remoting started; listening on addresses 
>> :[akka.tcp://flink@localhost:6123 <>]
>> 2018-04-05 22:37:29,135 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Actor system started at akka.tcp://flink@localhost:6123 <>
>> 2018-04-05 22:37:29,148 INFO  
>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics 
>> reporter configured, no metrics will be exposed/reported.
>> 2018-04-05 22:37:29,152 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager web frontend
>> 2018-04-05 22:37:29,161 INFO  
>> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
>> location of JobManager log file: 
>> /Users/asmirnov/flink-1.4.2/log/flink-jobmanager-0.log
>> 2018-04-05 22:37:29,161 INFO  
>> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
>> location of JobManager stdout file: 
>> /Users/asmirnov/flink-1.4.2/log/flink-jobmanager-0.out
>> 2018-04-05 22:37:29,162 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Using 
>> directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/flink-web-901a3fb7-d366-4f90-b75c-1e1f8038ed37
>>  for the web interface files
>> 2018-04-05 22:37:29,162 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Created 
>> directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/flink-web-21e5d8a8-7967-40f0-97d7-a803d9bd5913
>>  for web frontend JAR file uploads.
>> 2018-04-05 22:37:29,447 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
>> listening at localhost:8081
>> 2018-04-05 22:37:29,447 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager actor
>> 2018-04-05 22:37:29,452 INFO  org.apache.flink.runtime.blob.BlobServer   
>>- Created BLOB server storage directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/blobStore-6777e862-0c2c-4679-a42f-b1921baa52

Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Piotr Nowojski
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka 
producer configuration (transaction.timeout.ms property) and Kafka broker 
configuration. The most likely cause of such error message is when Kafka's 
timeout is smaller then Flink’s checkpoint interval and transactions are not 
committed quickly enough before timeout occurring.

Piotrek

> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
> 
> 
> Hi,
> 
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
> 7th, ... checkpoints:
> --
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> --
> 
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
> 
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
> 
> After the first checkpoint completed [see history after 1st ckpt.png], the 
> job is restarted due to the ProducerFencedException [see exception after 1st 
> ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint interval 
> is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a while 
> [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first 
> checkpoint.
> 
> Can anybody let me know what's going wrong behind the scene?
> 
> Best,
> 
> Dongwon
>  ckpt.png> ckpt.png>



Re: Checkpoint is not triggering as per configuration

2018-03-19 Thread Piotr Nowojski
Hi,

Please analyse what was going on the TaskManager and JobManager before this 
“task is not being executed at the moment”. What is the reason why it is not 
being executed? Was there some exception? Depending on your setup, you might 
need to check your stdout/stderr files (if your code is printing some errors).

Other issue might be if your operators/functions are initialising very slowly 
or being stuck somewhere.

Thanks, Piotrek

> On 19 Mar 2018, at 10:14, ms110400027 Syed Muhammad Abrar Akber 
> <ms110400...@vu.edu.pk> wrote:
> 
> Dear Piotrek;
> The log for task manager shows the following message
> 2018-03-19 17:07:58,000 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Custom File Source (1/1) is not being executed at the 
> moment. Aborting checkpoint.
> I don't know how to fix this issue.  I will highly appreciate your support, 
> if you help me in fixing the issue.
> 
> 
> Further, please guide me where I can find the resources which are helpful for 
> beginners like me to fix such issues.
> Thank you for your support.
> 
> Regards;
> Syed Muhamamd Abrar Akber
> MS110400027
> 
> On Mon, Feb 5, 2018 at 5:33 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Did you check task manager and job manager logs for any problems?
> 
> Piotrek
> 
> > On 5 Feb 2018, at 03:19, syed <ms110400...@vu.edu.pk 
> > <mailto:ms110400...@vu.edu.pk>> wrote:
> >
> > Hi
> > I am new to the flink world, and trying to understand. Currently, I am using
> > Flink 1.3.2 on a small cluster of 4 nodes,
> > I have configured checkpoint directory at HDFS, and run streaming word count
> > example with my own custom input file of 63M entries,
> > I enabled checkpoint every one second {/env.enableCheckpointing(1000)/}
> >
> > The problem I am facing is checkpoint is only triggered once after 1 second,
> > but no checkpoint afterwards, I run application for more than 5 minutes, but
> > checkpoint history shows only 1 checkpoint triggered and was successful. I
> > don't know why checkpoint not triggering after every second?
> > Please suggest me what is wrong?
> > Thanks in anticipation.
> >
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 



Re: Kafka ProducerFencedException after checkpointing

2018-03-21 Thread Piotr Nowojski
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the 
external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails 
because of some intermittent failure (for example worker crash/restart), you 
will only have a couple of minutes for a successful Flink job restart. 
Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Now my streaming pipeline is working without retries. 
> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
> [see screenshot_10min_ckpt.png].
> 
> I though that producer's transaction timeout starts when the external 
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last 
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for 
> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
> everything is working fine.
> Am I right?
> 
> Anyway thank you very much for the detailed explanation!
> 
> Best,
> 
> Dongwon
> 
> 
> 
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Please increase transaction.timeout.ms <http://transaction.timeout.ms/> to a 
> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
> issue here is that those two values are overlapping. I think that’s even 
> visible on the screenshots. First checkpoint completed started at 14:28:48 
> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
> seconds, with maximal transaction duration of 21 minutes.
> 
> In HAPPY SCENARIO (without any failure and restarting), you should assume 
> that your timeout interval should cover with some safety margin the period 
> between start of a checkpoint and end of the NEXT checkpoint, since this is 
> the upper bound how long the transaction might be used. In your case at least 
> ~25 minutes.
> 
> On top of that, as described in the docs, 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
>  , in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly. 
> 
> Piotrek
> 
> 
>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com 
>> <mailto:eastcirc...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> We have set producer's [transaction.timeout.ms 
>> <http://transaction.timeout.ms/>] to 15 minutes and have used the default 
>> setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>> not committed properly.
>> 
>> Best,
>> 
>> - Dongwon
>> 
>> 
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
>> producer configuration (transaction.timeout.ms 
>> <http://transaction.timeout.ms/> property) and Kafka broker configuration. 
>> The most likely cause of such error message is when Kafka's timeout is 
>> smaller then Flink’s checkpoint interval and transactions are not committed 
>> quickly enough before timeout occurring.
>> 
>> Piotrek
>> 
>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com 
>>> <mailto:eastcirc...@gmail.com>> wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>>> 7th, ... checkpoints:
>>> --
>>> j

Re: Confluent Schema Registry DeserializationSchema

2018-03-21 Thread Piotr Nowojski
Hi,

It looks like to me that kafka.utils.VerifiableProperties comes  from 
org.apache.kafka:kafka package - please check and solve (if possible) 
dependency conflicts in your pom.xml regarding this package. Probably there is 
some version collision.

Piotrek

> On 21 Mar 2018, at 16:40, dim5b  wrote:
> 
> I trying to connect to schema registry and deserialize the project. 
> 
> I am building my project and on mvn build i get the  error
> 
> class file for kafka.utils.VerifiableProperties not found...
> 
> 
> import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
> import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
> import io.confluent.kafka.serializers.KafkaAvroDecoder;
> import org.apache.flink.api.common.serialization.DeserializationSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> 
> 
> public class ConfluentAvroDeserializationSchema implements
> DeserializationSchema {
> 
>private final String schemaRegistryUrl;
>private final int identityMapCapacity;
>private KafkaAvroDecoder kafkaAvroDecoder;
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
>this(schemaRegistyUrl, 1000);
>}
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
> identityMapCapacity) {
>this.schemaRegistryUrl = schemaRegistryUrl;
>this.identityMapCapacity = identityMapCapacity;
>}
> 
>@Override
>public CelloAvro deserialize(byte[] bytes) throws IOException {
>if (kafkaAvroDecoder == null) {
>SchemaRegistryClient schemaRegistry = new
> CachedSchemaRegistryClient(this.schemaRegistryUrl,
> this.identityMapCapacity);
>this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
>}
>return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
>}
> 
>@Override
>public boolean isEndOfStream(CelloAvro celloAvro) {
>return false;
>}
> 
>@Override
>public TypeInformation getProducedType() {
>return TypeExtractor.getForClass(CelloAvro.class);
>}
> }
> 
> My dependencies are:
> 
> 
>   org.apache.flink
>   flink-avro
>   ${flink.version}
>   
> 
>   
>   io.confluent
>   kafka-avro-serializer
>   4.0.0
>   
> 
> 
> Could someone please help I see there is an open issue for an end to end
> test with  Confluent's Schema Registry
> 
> https://issues.apache.org/jira/browse/FLINK-8970
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error running on Hadoop 2.7

2018-03-21 Thread Piotr Nowojski
Hi,

Have you replaced all of your old Flink binaries with freshly downloaded 
 Hadoop 2.7 versions? Are you sure 
that something hasn't mix in the process?

Does some simple word count example works on the cluster after the upgrade?

Piotrek

> On 21 Mar 2018, at 16:11, ashish pok  wrote:
> 
> Hi All,
> 
> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
> 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
> like :) We definitely are using 2.7 binaries but it looks like there is a 
> call here to a private methos which screams runtime incompatibility. 
> 
> Anyone has seen this and have pointers?
> 
> Thanks, Ashish
> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
> method 
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>  from class 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
> at 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> at 
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
> at 
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
> 



<    1   2   3   4   5   6   7   >