Re: termination of stream#iterate on finite streams

2017-09-01 Thread Xingcan Cui
Hi Peter,

Let me try to explain this.

As you shown in the examples, the iterate method takes a function, which
"split" the initial stream
into two separate streams, i.e., initialStream => (stream1, stream2). The
stream2 works as the output
stream, whose results will be emitted to the successor operators (PrintSink
in your example), while
the stream1 works as a feedback stream, whose results will be resent to the
iterate operator.

In your codes, all the the long values will subtract 1 and be sent back to
the iterate operator, endlessly.
Try replacing your first map function to (_ + 1) and you'll see the
infinite results. For more information,
you can refer to this

or
read the javadoc.

Hope that helps.

Best,
Xingcan

On Fri, Sep 1, 2017 at 5:29 PM, Peter Ertl  wrote:

> Hi folks,
>
> I was doing some experiments with DataStream#iterate and what felt strange
> to me is the fact that #iterate() does not terminate on it's own when
> consuming a _finite_ stream.
>
> I think this is awkward und unexpected. Only thing that "helped" was
> setting an arbitrary and meaningless timeout on iterate.
>
> Imho this should not be necessary (maybe sent an internal "poison message"
> downward the iteration stream to signal shutdown of the streaming task?)
>
> example:
>
> // ---
>
> // does terminate by introducing a meaningless timeout
> // ---
> val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
> meaningless 'x' chars just to do anything
> }, 1000, keepPartitioning = false)
>
> iterationResult1.print()
>
> // ---
> // does NEVER terminate
> // ---
> val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
> })
> iterationResult2.print()
>
>
> Can someone elaborate on this - should I file a ticket?
>
> Regards
> Peter
>


Re: dynamically partitioned stream

2017-09-01 Thread Tony Wei
Hi Martin, Aljoscha

I think Aljoscha is right. My origin thought was to keep the state only
after a lambda function coming.

Use Aljoscha's scenario as example, initially, all data will be discarded
because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C] comes,
A, C begin to be routed to machine "0" and D, E begin to be routed to
machine "1". Then, when we get a new lambda f3 [C, D], we can duplicate C,
D and route these copies to machine "2".

However, after reading your example again, I found what you want is a whole
picture for all variables' state in a global view, so that no matter what
time a new lambda comes it can always get its variables' state immediately. In
that case, I have the same opinion as Aljoscha.

Best,
Tony Wei

2017-09-01 23:59 GMT+08:00 Aljoscha Krettek :

> Hi Martin,
>
> I think with those requirements this is very hard (or maybe impossible) to
> do efficiently in a distributed setting. It might be that I'm
> misunderstanding things but let's look at an example. Assume that
> initially, we don't have any lambdas, so data can be sent to any machine
> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
> C]. Say this gets routed to machine "0", now this means that messages with
> key A and C also need to be router to machine "0". Now, we get a new lambda
> f1 [D, E], say this gets routed to machine "2", meaning that messages with
> key D and E are also routed to machine "2".
>
> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
> lambdas and inputs to different machines? They all have to go to the same
> machine, but which one? I'm currently thinking that there would need to be
> some component that does the routing, but this has to be global, so it's
> hard to do in a distributed setting.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 1. Sep 2017, at 07:17, Martin Eden  wrote:
>
> This might be a way forward but since side inputs are not there I will try
> and key the control stream by the keys in the first co flat map.
>
> I'll see how it goes.
>
> Thanks guys,
> M
>
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei  wrote:
>
>> Hi Martin,
>>
>> Yes, that is exactly what I thought.
>> But the first step also needs to be fulfilled  by SideInput. I'm not sure
>> how to achieve this in the current release.
>>
>> Best,
>> Tony Wei
>>
>> Martin Eden 於 2017年8月31日 週四,下午11:32寫道:
>>
>>> Hi Aljoscha, Tony,
>>>
>>> Aljoscha:
>>> Yes it's the first option you mentioned.
>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>> be applied each time a new value for either A, B or C comes in. So we need
>>> to use state to cache the latest values. So using the example data stream
>>> in my first msg the emitted stream should be:
>>>
>>> 1. Data Stream:
>>> KEY VALUE TIME
>>> .
>>> .
>>> .
>>> C  V66
>>> B  V66
>>> A  V55
>>> A  V44
>>> C  V33
>>> A  V33
>>> B  V33
>>> B  V22
>>> A  V11
>>>
>>> 2. Control Stream:
>>> Lambda  ArgumentKeys TIME
>>> .
>>> .
>>> .
>>> f2[A, C] 4
>>> f1[A, B, C]1
>>>
>>> 3. Expected emitted stream:
>>> TIMEVALUE
>>> .
>>> .
>>> .
>>> 6  f1(V5, V6, V3)
>>> f1(V5, V6, V6)
>>> f2(V5, V6)
>>> 5  f1(V5, V3, V3)
>>> f2(V5, V3)
>>> 4  f1(V4, V3, V3)
>>> f2(V4, V3)
>>> 3  f1(V3, V3, V3)
>>> 2  -
>>> 1  -
>>>
>>> So essentially as soon as the argument list fills up then we apply the
>>> function/lambda at each new arriving message in the data stream for either
>>> argument key.
>>>
>>> Tony:
>>> Yes we need to group by and pass to the lambda.
>>> Ok, so what you are proposing might work. So your solution assumes that
>>> we have to connect with the control stream twice? Once for the tagging and
>>> another time re-connect-ing the control stream with the tagged stream for
>>> the actual application of the function/lambda?
>>>
>>> Thanks,
>>> Alex
>>>
>>>
>>>
>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi Martin,

 In your original example, what does this syntax mean exactly:

 f1[A, B, C]1

 Does it mean that f1 needs one A, one B and one C from the main stream?
 If yes, which ones, because there are multiple As and Bs and so on. Or does
 it mean that f1 can apply to an A or a B or a C? If it's the first, then I
 think it's quite hard to find a partitioning such that both f1, f2, and all
 A, B, and C go to the same machine.

 Best,
 Aljoscha

 On 31. Aug 2017, at 15:53, Tony Wei  wrote:

 Hi Martin,

 So the problem is that you want to group those arguments in Data Stream
 and pass 

Re: part files written to HDFS with .pending extension

2017-09-01 Thread Krishnanand Khambadkone
 BTW, I am using a BucketingSink and a DateTimeBucketer.  Do I need to set any 
other property to move the files from .pending state.
BucketingSink sink = new 
BucketingSink("hdfs://localhost:8020/flinktwitter/");sink.setBucketer(new
 DateTimeBucketer("-MM-dd--HHmm"));
On Friday, September 1, 2017, 5:03:46 PM PDT, Krishnanand Khambadkone 
 wrote:  
 
 This message is eligible for Automatic Cleanup! (kkhambadk...@yahoo.com) Add 
cleanup rule | More info
 Hi,  I have written a small program that uses a Twitter input stream and a 
HDFS output sink.   When the files are written to HDFS each part file in the 
directory has a .pending extension.  I am able to cat the file and see the 
tweet text.  Is this normal for the part files to have .pending extension.

-rw-r--r--   3 user  supergroup      46399 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-95.pending

-rw-r--r--   3 user supergroup      54861 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-96.pending

-rw-r--r--   3 user supergroup      41878 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-97.pending

-rw-r--r--   3  user supergroup      42813 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-98.pending

-rw-r--r--   3  user supergroup      42887 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-99.pending



part files written to HDFS with .pending extension

2017-09-01 Thread Krishnanand Khambadkone
Hi,  I have written a small program that uses a Twitter input stream and a HDFS 
output sink.   When the files are written to HDFS each part file in the 
directory has a .pending extension.  I am able to cat the file and see the 
tweet text.  Is this normal for the part files to have .pending extension.

-rw-r--r--   3 user  supergroup      46399 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-95.pending

-rw-r--r--   3 user supergroup      54861 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-96.pending

-rw-r--r--   3 user supergroup      41878 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-97.pending

-rw-r--r--   3  user supergroup      42813 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-98.pending

-rw-r--r--   3  user supergroup      42887 2017-09-01 16:35 
/flinktwitter/2017-09-01--1635/_part-0-99.pending



Re: Error submitting flink job

2017-09-01 Thread Krishnanand Khambadkone
I had to restart the flink process. That fixed the issue

Sent from my iPhone

> On Sep 1, 2017, at 3:39 PM, ant burton  wrote:
> 
> Is this of any help
> 
> https://stackoverflow.com/questions/33890759/how-to-specify-overwrite-to-writeastext-in-apache-flink-streaming-0-10-0
> 
> fs.overwrite-files: true in your flink-conf.yaml
> 
> 
>> On 1 Sep 2017, at 23:36, Krishnanand Khambadkone  
>> wrote:
>> 
>> I am trying to submit a flink job from the command line and seeing this 
>> error.  Any idea what could be happening
>> 
>> java.io.IOException: File or directory already exists. Existing files and 
>> directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to 
>> overwrite existing files and directories.
>> 
> 


Re: Re: Error submitting flink job

2017-09-01 Thread Krishnanand Khambadkone
 I have set this in my flink-conf.yaml file.
On Friday, September 1, 2017, 3:39:05 PM PDT, ant burton 
 wrote:  
 
 Is this of any help
https://stackoverflow.com/questions/33890759/how-to-specify-overwrite-to-writeastext-in-apache-flink-streaming-0-10-0
fs.overwrite-files: true in your flink-conf.yaml


On 1 Sep 2017, at 23:36, Krishnanand Khambadkone  wrote:
I am trying to submit a flink job from the command line and seeing this error.  
Any idea what could be happening
java.io.IOException: File or directory already exists. Existing files and 
directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to 
overwrite existing files and directories.




Re: Error submitting flink job

2017-09-01 Thread ant burton
Is this of any help

https://stackoverflow.com/questions/33890759/how-to-specify-overwrite-to-writeastext-in-apache-flink-streaming-0-10-0
 


fs.overwrite-files: true in your flink-conf.yaml


> On 1 Sep 2017, at 23:36, Krishnanand Khambadkone  
> wrote:
> 
> I am trying to submit a flink job from the command line and seeing this 
> error.  Any idea what could be happening
> 
> java.io.IOException: File or directory already exists. Existing files and 
> directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to 
> overwrite existing files and directories.
> 



Error submitting flink job

2017-09-01 Thread Krishnanand Khambadkone
I am trying to submit a flink job from the command line and seeing this error.  
Any idea what could be happening

java.io.IOException: File or directory already exists. Existing files and 
directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to 
overwrite existing files and directories.



Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-09-01 Thread Felix Cheung
Yap I was able to get this to work with a custom bucketer.

A custom bucketer can use the clock given ("processing time") or it can use a 
timestamp from the data ("event time") for the bucketing path.


From: Raja.Aravapalli 
Sent: Friday, September 1, 2017 10:21:00 AM
To: Aljoscha Krettek; Piotr Nowojski
Cc: user@flink.apache.org
Subject: Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to 
the part file name everytime a new part file is rolled


Thanks Aljoscha for the inputs.

I will check to extend “BasePathBucketer” class.


Regards,
Raja.

From: Aljoscha Krettek 
Date: Friday, September 1, 2017 at 10:27 AM
To: Piotr Nowojski 
Cc: Raja Aravapalli , "user@flink.apache.org" 

Subject: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the 
part file name everytime a new part file is rolled

Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have 
a look at BasePathBucketer and extend that to include the timestamp in the path 
that is returned. You should probably clamp the timestamp so that you don't get 
a new path for every millisecond.

Best,
Aljoscha

On 1. Sep 2017, at 08:18, Piotr Nowojski 
> wrote:

Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not 
specify a dynamically generated prefix/suffix.

Piotrek

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli 
> wrote:


Hi,

I have a flink application that is streaming data into HDFS and I am using 
Bucketing Sink for that. And, I want to know if is it possible to rename the 
part files that is being created in the base hdfs directory.

Right now I am using the below code for including the timestamp into part-file 
name, but the problem I am facing is the timestamp is not changing for the new 
part file that is being rolled!


BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + 
Long.toString(System.currentTimeMillis()));


Can someone please suggest me, what code changes I can try so that I get a new 
timestamp for every part file that is being rolled new?


Thanks a lot.

Regards,
Raja.




Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-09-01 Thread Raja . Aravapalli

Thanks Aljoscha for the inputs.

I will check to extend “BasePathBucketer” class.


Regards,
Raja.

From: Aljoscha Krettek 
Date: Friday, September 1, 2017 at 10:27 AM
To: Piotr Nowojski 
Cc: Raja Aravapalli , "user@flink.apache.org" 

Subject: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the 
part file name everytime a new part file is rolled

Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have 
a look at BasePathBucketer and extend that to include the timestamp in the path 
that is returned. You should probably clamp the timestamp so that you don't get 
a new path for every millisecond.

Best,
Aljoscha

On 1. Sep 2017, at 08:18, Piotr Nowojski 
> wrote:

Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not 
specify a dynamically generated prefix/suffix.

Piotrek

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli 
> wrote:


Hi,

I have a flink application that is streaming data into HDFS and I am using 
Bucketing Sink for that. And, I want to know if is it possible to rename the 
part files that is being created in the base hdfs directory.

Right now I am using the below code for including the timestamp into part-file 
name, but the problem I am facing is the timestamp is not changing for the new 
part file that is being rolled!


BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
'hdfsOutputBatchSizeInMB' MB
HdfsSink.setPartPrefix("PART-FILE-" + 
Long.toString(System.currentTimeMillis()));


Can someone please suggest me, what code changes I can try so that I get a new 
timestamp for every part file that is being rolled new?


Thanks a lot.

Regards,
Raja.




Re: dynamically partitioned stream

2017-09-01 Thread Aljoscha Krettek
Hi Martin,

I think with those requirements this is very hard (or maybe impossible) to do 
efficiently in a distributed setting. It might be that I'm misunderstanding 
things but let's look at an example. Assume that initially, we don't have any 
lambdas, so data can be sent to any machine because it doesn't matter where 
they go. Now, we get a new lambda f2 [A, C]. Say this gets routed to machine 
"0", now this means that messages with key A and C also need to be router to 
machine "0". Now, we get a new lambda f1 [D, E], say this gets routed to 
machine "2", meaning that messages with key D and E are also routed to machine 
"2".

Then, we get a new lambda f3 [C, D]. Do we now re-route all previous lambdas 
and inputs to different machines? They all have to go to the same machine, but 
which one? I'm currently thinking that there would need to be some component 
that does the routing, but this has to be global, so it's hard to do in a 
distributed setting.

What do you think?

Best,
Aljoscha

> On 1. Sep 2017, at 07:17, Martin Eden  wrote:
> 
> This might be a way forward but since side inputs are not there I will try 
> and key the control stream by the keys in the first co flat map.
> 
> I'll see how it goes.
> 
> Thanks guys,
> M
> 
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei  > wrote:
> Hi Martin,
> 
> Yes, that is exactly what I thought. 
> But the first step also needs to be fulfilled  by SideInput. I'm not sure how 
> to achieve this in the current release. 
> 
> Best,
> Tony Wei
> 
> Martin Eden >於 
> 2017年8月31日 週四,下午11:32寫道:
> Hi Aljoscha, Tony,
> 
> Aljoscha: 
> Yes it's the first option you mentioned.
> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be 
> applied each time a new value for either A, B or C comes in. So we need to 
> use state to cache the latest values. So using the example data stream in my 
> first msg the emitted stream should be:
> 
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C  V66
> B  V66
> A  V55
> A  V44
> C  V33
> A  V33
> B  V33
> B  V22
> A  V11
> 
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2[A, C] 4
> f1[A, B, C]1
> 
> 3. Expected emitted stream:
> TIMEVALUE 
> .
> .
> .
> 6  f1(V5, V6, V3)
> f1(V5, V6, V6)
> f2(V5, V6)
> 5  f1(V5, V3, V3)
> f2(V5, V3)
> 4  f1(V4, V3, V3)
> f2(V4, V3)
> 3  f1(V3, V3, V3)
> 2  -
> 1  -
> 
> So essentially as soon as the argument list fills up then we apply the 
> function/lambda at each new arriving message in the data stream for either 
> argument key.
> 
> Tony:
> Yes we need to group by and pass to the lambda.
> Ok, so what you are proposing might work. So your solution assumes that we 
> have to connect with the control stream twice? Once for the tagging and 
> another time re-connect-ing the control stream with the tagged stream for the 
> actual application of the function/lambda?
> 
> Thanks,
> Alex
> 
> 
> 
> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek  > wrote:
> Hi Martin,
> 
> In your original example, what does this syntax mean exactly:
> 
> f1[A, B, C]1
> 
> Does it mean that f1 needs one A, one B and one C from the main stream? If 
> yes, which ones, because there are multiple As and Bs and so on. Or does it 
> mean that f1 can apply to an A or a B or a C? If it's the first, then I think 
> it's quite hard to find a partitioning such that both f1, f2, and all A, B, 
> and C go to the same machine.
> 
> Best,
> Aljoscha
> 
>> On 31. Aug 2017, at 15:53, Tony Wei > > wrote:
>> 
>> Hi Martin,
>> 
>> So the problem is that you want to group those arguments in Data Stream and 
>> pass them to the lambda function from Control Stream at the same time. Am I 
>> right?
>> 
>> If right, then you could give each lambda function an id as well. Use these 
>> ids to tag those arguments to which they belong.
>> After that, keyBy function could be used to group those arguments belonging 
>> to the same lambda function. Joining this stream with Control Stream by 
>> function id could make arguments and function be in the same instance.
>> 
>> What do you think? Could this solution solve your problem?
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 20:43 GMT+08:00 Martin Eden > >:
>> Thanks for your reply Tony,
>> 
>> Yes we are in the latter case, where the functions/lambdas come in the 
>> control stream. Think of them as strings containing the logic of the 
>> function. The values for each of the arguments 

Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-09-01 Thread Aljoscha Krettek
Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have 
a look at BasePathBucketer and extend that to include the timestamp in the path 
that is returned. You should probably clamp the timestamp so that you don't get 
a new path for every millisecond.

Best,
Aljoscha

> On 1. Sep 2017, at 08:18, Piotr Nowojski  wrote:
> 
> Hi,
> 
> BucketingSink doesn’t support the feature that you are requesting, you can 
> not specify a dynamically generated prefix/suffix.
> 
> Piotrek
> 
>> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli > > wrote:
>> 
>>  
>> Hi,
>>  
>> I have a flink application that is streaming data into HDFS and I am using 
>> Bucketing Sink for that. And, I want to know if is it possible to rename the 
>> part files that is being created in the base hdfs directory.
>>  
>> Right now I am using the below code for including the timestamp into 
>> part-file name, but the problem I am facing is the timestamp is not changing 
>> for the new part file that is being rolled!
>>  
>>  
>> BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);
>> 
>> HdfsSink.setBucketer(new BasePathBucketer());
>> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
>> 'hdfsOutputBatchSizeInMB' MB
>> HdfsSink.setPartPrefix("PART-FILE-" + 
>> Long.toString(System.currentTimeMillis()));
>>  
>>  
>> Can someone please suggest me, what code changes I can try so that I get a 
>> new timestamp for every part file that is being rolled new?
>>  
>>  
>> Thanks a lot. 
>>  
>> Regards,
>> Raja.
> 



Re: Operator variables in memory scoped by key

2017-09-01 Thread gerardg
Thanks Aljoscha,

So I can think of three possible solutions:

* Use an instance dictionary to store the trie tree scoped by the same key
that the KeyedStream. That should work if the lifetime of the operator
object is tied to the keys that it processes.
* Store the trie tree in a ValueState but somehow tell flink to not
checkpoint this state. Also, it would be better if this could be in a
MemoryStateBackend and the rest of the application state in a
RocksDBStateBackend but I think backends can't be mixed in the same job.
* Forget about the MapState and just store the tree in a ValueState and
somehow control how it is checkpointed so I can manually implement
incremental checkpoints. This seems kind of complex as I would need to keep
track of the insertions and deletions to the tree from the last checkpoint.

The first one is the most straightforward but I'm not sure if it is really
feasible without knowing flink internals. Also, if I rely in how Flink
internally manages objects and at some point it changes, it could introduce
a bug difficult to detect.

Could you provide some insight? 

Thanks,

Gerard
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


termination of stream#iterate on finite streams

2017-09-01 Thread Peter Ertl
Hi folks,

I was doing some experiments with DataStream#iterate and what felt strange to 
me is the fact that #iterate() does not terminate on it's own when consuming a 
_finite_ stream.

I think this is awkward und unexpected. Only thing that "helped" was setting an 
arbitrary and meaningless timeout on iterate.

Imho this should not be necessary (maybe sent an internal "poison message" 
downward the iteration stream to signal shutdown of the streaming task?)

example:

// ---
// does terminate by introducing a meaningless timeout
// ---
val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
meaningless 'x' chars just to do anything
}, 1000, keepPartitioning = false)

iterationResult1.print()

// ---
// does NEVER terminate
// ---
val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
meaningless 'y' chars just to do anything
})
iterationResult2.print()

Can someone elaborate on this - should I file a ticket?

Regards
Peter

Re: Sink -> Source

2017-09-01 Thread Nico Kruber
Hi Philipp,
afaik, Flink doesn't offer this out-of-the-box. You could either hack 
something as suggested or use Kafka to glue different jobs together.

Both may affect exactly/at-least once guarantees, however. Also refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/
guarantees.html


Nico

On Thursday, 31 August 2017 16:06:55 CEST Philip Doctor wrote:
> I have a few Flink jobs.  Several of them share the same code.  I was
> wondering if I could make those shared steps their own job and then specify
> that the sink for one process was the source for another process, stiching
> my jobs together.  Is this possible ? I didn’t see it in the docs.. It
> feels like I could possibly hack something together with writeToSocket() on
> my data stream and then create a source that reads from a socket, but I was
> hoping there was a more fully baked solution to this.
 
> Thanks for your time.



signature.asc
Description: This is a digitally signed message part.


Re: Flink session on Yarn - ClassNotFoundException

2017-09-01 Thread Albert Giménez
Thanks for the replies :)

I managed to get it working following the instructions here 
,
 but I found a few issues that I guess were specific to HDInsight, or at least 
to the HDP version it uses. Trying to summarize:

Hadoop version 
After running “hadoop version”, the result was “2.7.3.2.6.1.3-4”.
However, when building I was getting errors that some dependencies from the 
Hortonworks repo were not found, for instance zookeeper “3.4.6.2.6.1.3-4”.
I browsed to the Hortonworks repo 

 to find a suitable version, so I ended up using 2.7.3.2.6.1.31-3 instead.

Scala version
I also had issues with dependencies if I tried using Scala version 2.11.11, so 
I compiled agains 2.11.7.

So, the maven command I used was this:
mvn install -DskipTests -Dscala.version=2.11.7 -Pvendor-repos 
-Dhadoop.version=2.7.3.2.6.1.31-3

Azure Jars
With all that, I still had class not found errors errors when trying to start 
my Flink session, for instance "java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.adl.HdiAdlFileSystem”.
To fix that, I had to find the azure-specific jars I needed to use. For that, I 
checked which jars Spark was using and copied / symlinked them into Flink’s 
“lib” directory:
/usr/hdp/current/spark2-client/jars/*azure*.jar
/usr/lib/hdinsight-datalake/adls2-oauth2-token-provider.jar

Guava Jar
Finally, for some reason my jobs were failing (the Cassandra driver was 
complaining about the Guava version being too old, although I had the right 
version in my assembled jar).
I just downloaded the version I needed (in my case, 23.0 
) 
and also put that into Flink’s lib directory.

I hope it helps other people trying to run Flink on Azure HDInsight :)

Kind regards,

Albert

> On Aug 31, 2017, at 8:18 PM, Banias H  wrote:
> 
> We had the same issue. Get the hdp version, from 
> /usr/hdp/current/hadoop-client/hadoop-common-.jar for example. Then 
> rebuild flink from src:
> mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=
> 
> for example: mvn clean install -DskipTests -Pvendor-repos 
> -Dhadoop.version=2.7.3.2.6.1.0-129
> 
> Copy and setup build-target/ to the cluster. Export HADOOP_CONF_DIR and 
> YARN_CONF_DIR according to your env. You should have no problem starting the 
> session.
> 
> 
> On Wed, Aug 30, 2017 at 6:45 AM, Federico D'Ambrosio  > wrote:
> Hi,
> What is your "hadoop version" output? I'm asking because you said your hadoop 
> distribution is in /usr/hdp so it looks like you're using Hortonworks HDP, 
> just like myself. So, this would be a third party distribution and you'd need 
> to build Flink from source according to this: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#vendor-specific-versions
>  
> 
> 
> Federico D'Ambrosio
> 
> Il 30 ago 2017 13:33, "albert"  > ha scritto:
> Hi Chesnay,
> 
> Thanks for your reply. I did download the binaries matching my Hadoop
> version (2.7), that's why I was wondering if the issue had something to do
> with the exact hadoop version flink is compiled again, or if there might be
> things that are missing in my environment.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-09-01 Thread Piotr Nowojski
Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not 
specify a dynamically generated prefix/suffix.

Piotrek

> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli  
> wrote:
> 
>  
> Hi,
>  
> I have a flink application that is streaming data into HDFS and I am using 
> Bucketing Sink for that. And, I want to know if is it possible to rename the 
> part files that is being created in the base hdfs directory.
>  
> Right now I am using the below code for including the timestamp into 
> part-file name, but the problem I am facing is the timestamp is not changing 
> for the new part file that is being rolled!
>  
>  
> BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);
> 
> HdfsSink.setBucketer(new BasePathBucketer());
> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
> 'hdfsOutputBatchSizeInMB' MB
> HdfsSink.setPartPrefix("PART-FILE-" + 
> Long.toString(System.currentTimeMillis()));
>  
>  
> Can someone please suggest me, what code changes I can try so that I get a 
> new timestamp for every part file that is being rolled new?
>  
>  
> Thanks a lot. 
>  
> Regards,
> Raja.