can we do Flink CEP on event stream or batch or both?

2019-04-29 Thread kant kodali
Hi All,

I have the following questions.

1) can we do Flink CEP on event stream or batch?
2) If we can do streaming I wonder how long can we keep the stream
stateful? I also wonder if anyone successfully had done any stateful
streaming for days or months(with or without CEP)? or is stateful streaming
is mainly to keep state only for a few hours?

I have a use case where events are ingested from multiple sources and in
theory, the sources are supposed to have the same events however in
practice the sources will not have the same events so when the events are
ingested from multiple sources the goal is to detect where the "breaks"
are(meaning the missing events like exists in one source but not in other)?
so I realize this is the typical case for CEP.

Also, in this particular use case events that supposed to come 2 years ago
can come today and if so, need to update those events also in real time or
near real time. Sure there wouldn't be a lot of events that were missed 2
years ago but there will be a few. What would be the best approach?

One solution I can think of is to do Stateful CEP with a window of one day
or whatever short time period where most events will occur and collect the
events that fall beyond that time period(The late ones) into some Kafka
topic and have a separate stream analyze the time period of the late ones,
construct the corresponding NFA and run through it again.  Please let me
know how this sounds or if there is a better way to do it.

Thanks!


Re: How to verify what maxParallelism is set to?

2019-04-29 Thread Sean Bollin
Thanks! Do you know if it's possible somehow to verify the global
maxParallelism other than calling .getMaxParallelism? Either through
an API call or the UI?

On Mon, Apr 29, 2019 at 8:12 PM Guowei Ma  wrote:
>
> Hi,
> StreamExecutionEnvironment is used to set a default maxParallelism for 
> global. If a "operator"'s maxParallelism is -1 the operator will be set the 
> maxParallelism which is set by StreamExecutionEnvironment.
>
> >>>Any API or way I can verify?
> I can't find any easy way to do that. But you could use get the StreamGraph 
> from api StreamExecutionEnvironment.getStreamGraph and try to print the 
> StreamNode::maxParallelism.
> Best,
> Guowei
>
>
> Sean Bollin  于2019年4月30日周二 上午8:19写道:
>>
>> Hi all,
>>
>> How do you verify what max parallelism is set to on the job level? I do not 
>> see it in the 1.6 UI, for example.
>>
>> I’m setting maxParallelism to 4096 on the StreamExecutionEnvironment before 
>> execution but printing out the maxParallelism in an operator still displays 
>> -1. Since this is such an important setting I’d like some sanity check to 
>> verify it is the value I expect.
>>
>> Any API or way I can verify?


Re: How to verify what maxParallelism is set to?

2019-04-29 Thread Guowei Ma
Hi,
StreamExecutionEnvironment is used to set a default maxParallelism for
global. If a "operator"'s maxParallelism is -1 the operator will be set the
maxParallelism which is set by StreamExecutionEnvironment.

>>>Any API or way I can verify?
I can't find any easy way to do that. But you could use get the StreamGraph
from api StreamExecutionEnvironment.getStreamGraph and try to print
the StreamNode::maxParallelism.
Best,
Guowei


Sean Bollin  于2019年4月30日周二 上午8:19写道:

> Hi all,
>
> How do you verify what max parallelism is set to on the job level? I do
> not see it in the 1.6 UI, for example.
>
> I’m setting maxParallelism to 4096 on the StreamExecutionEnvironment
> before execution but printing out the maxParallelism in an operator still
> displays -1. Since this is such an important setting I’d like some sanity
> check to verify it is the value I expect.
>
> Any API or way I can verify?


Re: POJO with private fields and toApeendStream of StreamTableEnvironment

2019-04-29 Thread Sung Gon Yi
Sorry. I sent an empty reply.

I tried again with getter/setter. And it works. Thanks.

—
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter @Setter
public class P implements Serializable {
private String name;
private Integer value;
}
—



> On 29 Apr 2019, at 11:12 PM, Timo Walther  wrote:
> 
> Hi Sung,
> 
> private fields are only supported if you specify getters and setters 
> accordingly. Otherwise you need to use `Row.class` and perform the mapping in 
> a subsequent map() function manually via reflection.
> 
> Regards,
> Timo
> 
> 
> Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
>> In 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>  
>> ,
>> POJO data type is available to convert to DataStream.
>> 
>> I would like to use POJO data type class with private fields. I wonder it is 
>> possible or not officially. 
>> Any currently it does not work.
>> 
>> Codes:
>> —
>> CsvTableSource as = CsvTableSource.builder()
>> .path("aa.csv")
>> .field("name", STRING)
>> .field("value", INT)
>> .build();
>> Table aa = tEnv.fromTableSource(as);
>> tEnv.toAppendStream(aa, P.class);
>> —
>> public class P implements Serializable {
>> private String name;
>> private Integer value;
>> }
>> —
>> 
>> Above codes, I got below error message:
>> ==
>> Exception in thread "main" org.apache.flink.table.api.TableException: Arity 
>> [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of 
>> requested type [GenericType].
>>  at 
>> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
>>  at 
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
>>  at 
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
>>  at ...
>> ==
>> 
>> When fields of class P are changed to “public”, it works well.
>> —
>> public class P implements Serializable {
>> public String name;
>> public Integer value;
>> }
>> —
>> 
>> Thanks,
>> skonmeme
>> 
>> 
>> 
> 



Re: POJO with private fields and toApeendStream of StreamTableEnvironment

2019-04-29 Thread Sung Gon Yi


> On 29 Apr 2019, at 11:12 PM, Timo Walther  wrote:
> 
> Hi Sung,
> 
> private fields are only supported if you specify getters and setters 
> accordingly. Otherwise you need to use `Row.class` and perform the mapping in 
> a subsequent map() function manually via reflection.
> 
> Regards,
> Timo
> 
> 
> Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
>> In 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>  
>> ,
>> POJO data type is available to convert to DataStream.
>> 
>> I would like to use POJO data type class with private fields. I wonder it is 
>> possible or not officially. 
>> Any currently it does not work.
>> 
>> Codes:
>> —
>> CsvTableSource as = CsvTableSource.builder()
>> .path("aa.csv")
>> .field("name", STRING)
>> .field("value", INT)
>> .build();
>> Table aa = tEnv.fromTableSource(as);
>> tEnv.toAppendStream(aa, P.class);
>> —
>> public class P implements Serializable {
>> private String name;
>> private Integer value;
>> }
>> —
>> 
>> Above codes, I got below error message:
>> ==
>> Exception in thread "main" org.apache.flink.table.api.TableException: Arity 
>> [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of 
>> requested type [GenericType].
>>  at 
>> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
>>  at 
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
>>  at 
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
>>  at 
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
>>  at ...
>> ==
>> 
>> When fields of class P are changed to “public”, it works well.
>> —
>> public class P implements Serializable {
>> public String name;
>> public Integer value;
>> }
>> —
>> 
>> Thanks,
>> skonmeme
>> 
>> 
>> 
> 



Timestamp and key preservation over operators

2019-04-29 Thread Averell
Hello,

I extracted timestamps using BoundedOutOfOrdernessTimestampExtractor from my
sources, have a WindowFunction, and found that my timestamps has been lost.
To do another Window operation, I need to extract timestamp again. I tried
to find a document for that but haven't found one.
Could you please help tell which type of operators would preserve records'
timestamp? 

The same question for keyed stream. I have been using the same key
throughout my flow, but with many tranformations (using different operators,
including coProcessFunction, and converting my data between different
classes), and I have been trying to use
DataStreamUtils.reinterpretAsKeyedStream. Is it safe to assume that as long
as I dont do transformation on key, I could use that
reinterpretAsKeyedStream function?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-29 Thread M Singh
 Hi An0:
Here is my understanding - each operator has the watermark which is the lowest 
of all it's input streams. When the watermark for an operator is updated, the 
lowest one becomes the new watermark for that operator and is fowarded to the 
output streams for that operator.  So, if one of the stream's watermark is the 
not updated, it might keep the operator's watermark to move forward, thereby 
affecting the watermark emitted to the following operators.

Here is the description for how watermarks work - 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
Hope that helps.







On Monday, April 29, 2019, 2:06:12 PM EDT, an0  wrote:  
 
 Thanks very much. It definitely explains the problem I'm seeing. However, 
something I need to confirm:
You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in 
assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows 
through a specific key's stream, all key streams have the same watermarks? So 
time-wise, `window` behaves as if `keyBy` is not there at all?

On 2019/04/26 06:34:10, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Watermarks are meta events that travel independently of data events.
> 
> 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> instances of trips have some data(this is my assumption) so Watermarks
> can be generated. Afterwards even if some of the keyed partitions have
> no data, Watermarks are broadcasted/forwarded anyway. In other words if
> at some point Watermarks were generated for all partitions of a single
> stage, they will be forwarded beyond this point.
> 
> 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> watermarks for an empty partition which produces no Watermarks at all
> for this partition, therefore there is no progress beyond this point.
> 
> I hope this clarifies it a bit.
> 
> Best,
> 
> Dawid
> 
> On 25/04/2019 16:49, an0 wrote:
> > If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> > before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> > and task 2, with task 2 idling, no matter whether 
> > `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> > task 2 receives elements only depends on the key distribution, has nothing 
> > to do with timestamp assignment, right?
> >
> >                                                                             
> >           /key 1 trips\
> >                                                                             
> >         /                    \  
> > (A) trips--> assignTimestampsAndWatermarks-->keyBy                    
> > timeWindowAll
> >                                                                             
> >         \      idle        /
> >                                                                             
> >           \key 2 trips/
> >
> >                            /key 1 trips--> assignTimestampsAndWatermarks\
> >                          /                                                  
> >                              \  
> > (B) trips-->keyBy                                                           
> >                      timeWindowAll
> >                          \      idle                                        
> >                            /
> >                            \key 2 trips--> assignTimestampsAndWatermarks/
> >
> > How things are different between A and B from `timeWindowAll`'s perspective?
> >
> > BTW, thanks for the webinar link, I'll check it later.
> >
> > On 2019/04/25 08:30:20, Dawid Wysakowicz  wrote: 
> >> Hi,
> >>
> >> Yes I think your explanation is correct. I can also recommend Seth's
> >> webinar where he talks about debugging Watermarks[1]
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1]
> >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> >>
> >> On 22/04/2019 22:55, an0 wrote:
> >>> Thanks, I feel I'm getting closer to the truth. 
> >>>
> >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I 
> >>> get 2 tasks running after `keyBy` if even all elements have the same key 
> >>> so go to 1 down stream(say task 1)? And it is the other task(task 2) with 
> >>> no incoming data that caused the `timeWindowAll` stream unable to 
> >>> progress? Because both task 1 and task 2 are its input streams and one is 
> >>> idling so its event time cannot make progress?
> >>>
> >>> On 2019/04/22 01:57:39, Guowei Ma  wrote: 
>  HI,
> 
>  BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>  receives an element.
> 
>  For after Keyby:
>  Flink uses the HashCode of key and the parallelism of down stream to 
>  decide
>  which subtask would receive the element. This means if your key is always
>  same, all the sources will only send the elements to the same down stream
>  task, for example only no. 3 

Re: Flink session window not progressing

2019-04-29 Thread Konstantin Knauf
Hi Henrik,

yes, the output count of a sink (and the input count of sources) is always
zero, because only Flink internal traffic is reflected in these metrics.
There is a Jira issue to change this [1].

Cheers,

Konstantin

[1] https://issues.apache.org/jira/browse/FLINK-7286



On Mon, Apr 29, 2019 at 7:29 PM Henrik Feldt  wrote:

> Thinking more about this; it might just be me who is reacting to the sink
> having a zero rate of output. In fact, I have about two gigs of messages
> left in the queue until it's up to date, so I may just be running a slow
> calculation (because I've run a batch job to backfill to after stream).
> Perhaps something is broken about sink output counts?
>
> On 29 Apr 2019, at 19:26, Henrik Feldt wrote:
>
> Hi guys,
>
> I'm going a PoC with Flink and I was wondering if you could help me.
>
> I've asked a question here
> https://stackoverflow.com/questions/55907954/flink-session-window-sink-timestamp-not-progressing
> with some images. However, in summary my question is this; why doesn't my
> session window progress?
>
> It works great when I run it against historical data, but when I run it
> against a streaming data source (pub/sub) it sometimes gets stuck. In this
> case, it got stuck at exactly 12:00 UTC.
>
> My window is a session window, but one where I bump the last 'edge' of the
> window by different amounts depending on what event type it is. Because
> some events never have other events after them.
>
> You can see the problem the easiest in these graphs, specifically the one
> that stops at 14:00 CEST (12:00 UTC) - with green bars.
>
> This graph shows the low-watermark progressing throughout the node in the
> middle (which is also a sink); and this holds for all the nodes in the
> graph. However, the session windowing doesn't progress, despite the
> low-watermark progressing.
>
> Regards,
> Henrik
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: -




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Flink heap memory

2019-04-29 Thread Rad Rad
Hi, 

I would like to know the amount of heap memory currently used (in bytes) of
a specific job which runs on Flink cluster. 

Regards. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-29 Thread an0
Thanks very much. It definitely explains the problem I'm seeing. However, 
something I need to confirm:
You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in 
assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows 
through a specific key's stream, all key streams have the same watermarks? So 
time-wise, `window` behaves as if `keyBy` is not there at all?

On 2019/04/26 06:34:10, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Watermarks are meta events that travel independently of data events.
> 
> 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> instances of trips have some data(this is my assumption) so Watermarks
> can be generated. Afterwards even if some of the keyed partitions have
> no data, Watermarks are broadcasted/forwarded anyway. In other words if
> at some point Watermarks were generated for all partitions of a single
> stage, they will be forwarded beyond this point.
> 
> 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> watermarks for an empty partition which produces no Watermarks at all
> for this partition, therefore there is no progress beyond this point.
> 
> I hope this clarifies it a bit.
> 
> Best,
> 
> Dawid
> 
> On 25/04/2019 16:49, an0 wrote:
> > If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> > before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> > and task 2, with task 2 idling, no matter whether 
> > `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> > task 2 receives elements only depends on the key distribution, has nothing 
> > to do with timestamp assignment, right?
> >
> > 
> > /key 1 trips\
> > 
> >   /\  
> > (A) trips--> assignTimestampsAndWatermarks-->keyBy
> > timeWindowAll
> > 
> >   \   idle/
> > 
> > \key 2 trips/
> >
> >/key 1 trips--> assignTimestampsAndWatermarks\
> >  /  
> >\  
> > (B) trips-->keyBy   
> >   timeWindowAll
> >  \   idle   
> >   /
> >\key 2 trips--> assignTimestampsAndWatermarks/
> >
> > How things are different between A and B from `timeWindowAll`'s perspective?
> >
> > BTW, thanks for the webinar link, I'll check it later.
> >
> > On 2019/04/25 08:30:20, Dawid Wysakowicz  wrote: 
> >> Hi,
> >>
> >> Yes I think your explanation is correct. I can also recommend Seth's
> >> webinar where he talks about debugging Watermarks[1]
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1]
> >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> >>
> >> On 22/04/2019 22:55, an0 wrote:
> >>> Thanks, I feel I'm getting closer to the truth. 
> >>>
> >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I 
> >>> get 2 tasks running after `keyBy` if even all elements have the same key 
> >>> so go to 1 down stream(say task 1)? And it is the other task(task 2) with 
> >>> no incoming data that caused the `timeWindowAll` stream unable to 
> >>> progress? Because both task 1 and task 2 are its input streams and one is 
> >>> idling so its event time cannot make progress?
> >>>
> >>> On 2019/04/22 01:57:39, Guowei Ma  wrote: 
>  HI,
> 
>  BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>  receives an element.
> 
>  For after Keyby:
>  Flink uses the HashCode of key and the parallelism of down stream to 
>  decide
>  which subtask would receive the element. This means if your key is always
>  same, all the sources will only send the elements to the same down stream
>  task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> 
>  For before Keyby:
>  In your case, the Source and BoundedOutOfOrdernessTimestampExtractors 
>  would
>  be chained together, which means every
>  BoundedOutOfOrdernessTimestampExtractors will receive elements.
> 
>  Best,
>  Guowei
> 
> 
>  an0  于2019年4月19日周五 下午10:41写道:
> 
> > Hi,
> >
> > First of all, thank you for the `shuffle()` tip. It works. However, I
> > still don't understand why it doesn't work without calling `shuffle()`.
> >
> > Why would not all BoundedOutOfOrdernessTimestampExtractors receive 
> > trips?
> > All the trips has keys and timestamps. As I said in my 

Re: Flink session window not progressing

2019-04-29 Thread Henrik Feldt
Thinking more about this; it might just be me who is reacting to the 
sink having a zero rate of output. In fact, I have about two gigs of 
messages left in the queue until it's up to date, so I may just be 
running a slow calculation (because I've run a batch job to backfill to 
after stream). Perhaps something is broken about sink output counts?


On 29 Apr 2019, at 19:26, Henrik Feldt wrote:


Hi guys,

I'm going a PoC with Flink and I was wondering if you could help me.

I've asked a question here 
https://stackoverflow.com/questions/55907954/flink-session-window-sink-timestamp-not-progressing 
with some images. However, in summary my question is this; why doesn't 
my session window progress?


It works great when I run it against historical data, but when I run 
it against a streaming data source (pub/sub) it sometimes gets stuck. 
In this case, it got stuck at exactly 12:00 UTC.


My window is a session window, but one where I bump the last 'edge' of 
the window by different amounts depending on what event type it is. 
Because some events never have other events after them.


You can see the problem the easiest in these graphs, specifically the 
one that stops at 14:00 CEST (12:00 UTC) - with green bars.


![](cid:5D5F9BCB-A282-4161-8DE3-2A711A281E63@voiapp.io "Screenshot 
2019-04-29 at 19.07.09.png")


This graph shows the low-watermark progressing throughout the node in 
the middle (which is also a sink); and this holds for all the nodes in 
the graph. However, the session windowing doesn't progress, despite 
the low-watermark progressing.


![](cid:0C3554DD-0A6E-4F2D-B101-C7F6AD51F0E9@voiapp.io "Screenshot 
2019-04-29 at 19.00.42.png")


Regards,
Henrik


Flink session window not progressing

2019-04-29 Thread Henrik Feldt

Hi guys,

I'm going a PoC with Flink and I was wondering if you could help me.

I've asked a question here 
https://stackoverflow.com/questions/55907954/flink-session-window-sink-timestamp-not-progressing 
with some images. However, in summary my question is this; why doesn't 
my session window progress?


It works great when I run it against historical data, but when I run it 
against a streaming data source (pub/sub) it sometimes gets stuck. In 
this case, it got stuck at exactly 12:00 UTC.


My window is a session window, but one where I bump the last 'edge' of 
the window by different amounts depending on what event type it is. 
Because some events never have other events after them.


You can see the problem the easiest in these graphs, specifically the 
one that stops at 14:00 CEST (12:00 UTC) - with green bars.


![](cid:5D5F9BCB-A282-4161-8DE3-2A711A281E63@voiapp.io "Screenshot 
2019-04-29 at 19.07.09.png")


This graph shows the low-watermark progressing throughout the node in 
the middle (which is also a sink); and this holds for all the nodes in 
the graph. However, the session windowing doesn't progress, despite the 
low-watermark progressing.


![](cid:0C3554DD-0A6E-4F2D-B101-C7F6AD51F0E9@voiapp.io "Screenshot 
2019-04-29 at 19.00.42.png")


Regards,
Henrik


Flink Load multiple file

2019-04-29 Thread Soheil Pourbafrani
Hi,

I want to load multiple file and apply the processing logic on them. After
some searches using the following code I can load all the files in the
directory named "input" into Flink:

TextInputFormat tif = new TextInputFormat(new Path("input"));
DataSet raw = env.readFile(tif, "input//");


If I understood correctly the second / in following of the directory path
will force Flink to load all files in the directory. But I don't know when
creating the TextInputFormat object, why we should pass the path! what is
it's fuctionality?

I also want to know is there a way to pass the path of multiple file
exactly without any regular expression? for example:

DataSet raw = env.readFile(tif, "input/data1, input/data2");


And pls inform me if a better way for loading multiple files exists.


Thanks


Re: Working around lack of SQL triggers

2019-04-29 Thread deklanw
Hi,

Thanks for the reply.

I had already almost completely lost hope in using Flink SQL. You have
confirmed that.

But, like I said, I don't know how to reduce the large amount of boilerplate
I foresee this requiring with the DataStream API. Can you help me with that?
You mention "parameterizable aggregation functions", can you show an
example? I don't know how to do this without reinventing AVG and COUNT over
and over again.

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Write simple text into hdfs

2019-04-29 Thread Ken Krugler
DataSet.writeAsText(hdfs://) should work.

— Ken

> On Apr 29, 2019, at 8:00 AM, Hai  wrote:
> 
> Hi, 
> 
> Could anyone give a simple way to write a DataSet into hdfs using a 
> simple way?
> 
> I look up the official document, and didn’t find that, am I missing some 
> thing ?
> 
> Many thanks.

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Write simple text into hdfs

2019-04-29 Thread Hai
Hi,


Could anyone give a simple way to write a DataSetString into hdfs using a 
simple way?


I look up the official document, and didn’t find that, am I missing some thing ?


Many thanks.

Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-29 Thread Gary Yao
Since there were no objections so far, I will proceed with removing the
code [1].

[1] https://issues.apache.org/jira/browse/FLINK-12312

On Wed, Apr 24, 2019 at 1:38 PM Gary Yao  wrote:

> The idea is to also remove the rescaling code in the JobMaster. This will
> make
> it easier to remove the ExecutionGraph reference from the JobMaster which
> is
> needed for the scheduling rework [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-12231
>
> On Wed, Apr 24, 2019 at 12:14 PM Shuai Xu  wrote:
>
>> Will we only remove command support in client side or the code in job
>> master will also be removed?
>>
>> Till Rohrmann  于2019年4月24日周三 下午4:12写道:
>>
>> > +1 for temporarily removing support for the modify command.
>> >
>> > Eventually, we have to add it again in order to support auto scaling.
>> The
>> > next time we add it, we should address the known limitations.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:
>> >
>> > > Hi Gary,
>> > >
>> > > + 1 to remove it for now. Actually some users are not aware of that
>> it’s
>> > > still experimental, and ask quite a lot about the problem it causes.
>> > >
>> > > Best,
>> > > Paul Lam
>> > >
>> > > 在 2019年4月24日,14:49,Stephan Ewen  写道:
>> > >
>> > > Sounds reasonable to me. If it is a broken feature, then there is not
>> > much
>> > > value in it.
>> > >
>> > > On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
>> > >
>> > > Hi all,
>> > >
>> > > As the subject states, I am proposing to temporarily remove support
>> for
>> > > changing the parallelism of a job via the following syntax [1]:
>> > >
>> > >./bin/flink modify [job-id] -p [new-parallelism]
>> > >
>> > > This is an experimental feature that we introduced with the first
>> rollout
>> > > of
>> > > FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
>> > >
>> > >* Rescaling does not work with HA enabled [2]
>> > >* New parallelism is not persisted, i.e., after a JobManager
>> restart,
>> > > the job
>> > >  will be recovered with the initial parallelism
>> > >
>> > > Due to the above-mentioned issues, I believe that currently nobody
>> uses
>> > > "modify -p" to rescale their jobs in production. Moreover, the
>> rescaling
>> > > feature stands in the way of our current efforts to rework Flink's
>> > > scheduling
>> > > [3]. I therefore propose to remove the rescaling code for the time
>> being.
>> > > Note
>> > > that it will still be possible to change the parallelism by taking a
>> > > savepoint
>> > > and restoring the job with a different parallelism [4].
>> > >
>> > > Any comments and suggestions will be highly appreciated.
>> > >
>> > > Best,
>> > > Gary
>> > >
>> > > [1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
>> > > [2] https://issues.apache.org/jira/browse/FLINK-8902
>> > > [3] https://issues.apache.org/jira/browse/FLINK-10429
>> > > [4]
>> > >
>> > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
>> > >
>> > >
>> > >
>> >
>>
>


Re: Emitting current state to a sink

2019-04-29 Thread M Singh
 Hi Avi:
Can you please elaborate (or include an example/code snippet) of how you were 
able to achieve collecting the keyed states from the processBroadcastElement 
method using the applyToKeyedState ?  

I am trying to understand which collector you used to emit the state since the 
broadcasted elements/state might be different from the non-broadcast 
elements/state.
Thanks for your help.

Mans
On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske 
 wrote:  
 
 Nice! 
Thanks for the confirmation :-)
Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi :

Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

This Message originated outside your organization.
Hi Avi,
I'm not sure if  you cannot emit data from the keyed state when you receive a 
broadcasted message.
The Context parameter of the processBroadcastElement() method in the 
KeyedBroadcastProcessFunction has the applyToKeyedState() method.The method 
takes a KeyedStateFunction that is applied to each key of a state, but does not 
provide a Collector to emit data.Maybe you can pass the collector to the 
KeyedStateFunction and emit records while it iterates over the key space.

Best, Fabian

Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi :

Hi Timo,I defiantly did. but broadcasting a command and trying to address the 
persisted state (I mean the state of the data stream and not the broadcasted 
one) you get the exception that I wrote (java.lang.NullPointerException: No key 
set. This method should not be called outside of a keyed context). e.g doing 
something likeoverride def processBroadcastElement(value: BroadcastRequest, 
ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }will yield that exception
BRAvi
On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

This Message originated outside your organization.

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.

Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java

Am 26.04.19 um 07:57 schrieb Avi Levi:
> Hi,
> We have a keyed pipeline with persisted state.
> Is there a way to broadcast a command and collect all values that 
> persisted in  the state ?
>
> The end result can be for example sending a fetch command to all 
> operators and emitting the results to some sink
>
> why do we need it ? from time to time we might want to check if we are 
> missing keys what are the additional keys or simply emit the current 
> state to a table and to query it.
>
> I tried simply broadcasting a command and addressing the persisted 
> state but that resulted with:
> java.lang.NullPointerException: No key set. This method should not be 
> called outside of a keyed context.
>
> is there a good way to achieve that ?
>
> Cheers
> Avi





  

Re: Read mongo datasource in Flink

2019-04-29 Thread Kenny Gorman
Just a thought, A robust and high performance way to potentially achieve your 
goals is:

Debezium->Kafka->Flink

https://debezium.io/docs/connectors/mongodb/ 


Good robust handling of various topologies, reasonably good scaling properties, 
good restart-ability and such..

Thanks
Kenny Gorman
Co-Founder and CEO
www.eventador.io 



> On Apr 29, 2019, at 7:47 AM, Wouter Zorgdrager  
> wrote:
> 
> Yes, that is correct. This is a really basic implementation that doesn't take 
> parallelism into account. I think you need something like this [1] to get 
> that working.
> 
> [1]: 
> https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/#dbcmd.parallelCollectionScan
>  
> 
> Op ma 29 apr. 2019 om 14:37 schreef Flavio Pompermaier  >:
> But what about parallelism with this implementation? From what I see there's 
> only a single thread querying Mongo and fetching all the data..am I wrong?
> 
> On Mon, Apr 29, 2019 at 2:05 PM Wouter Zorgdrager  > wrote:
> For a framework I'm working on, we actually implemented a (basic) Mongo 
> source [1]. It's written in Scala and uses Json4s [2] to parse the data into 
> a case class. It uses a Mongo observer to iterate over a collection and emit 
> it into a Flink context. 
> 
> Cheers,
> Wouter
> 
> [1]: 
> https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala
>  
> 
>  
> [2]: http://json4s.org/ 
> Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier  >:
> I'm not aware of an official source/sink..if you want you could try to 
> exploit the Mongo HadoopInputFormat as in [1].
> The provided link use a pretty old version of Flink but it should not be a 
> big problem to update the maven dependencies and the code to a newer version.
> 
> Best,
> Flavio
> 
> [1] https://github.com/okkam-it/flink-mongodb-test 
> 
> On Mon, Apr 29, 2019 at 6:15 AM Hai  > wrote:
> Hi,
> 
> Can anyone give me a clue about how to read mongodb’s data as a 
> batch/streaming datasource in Flink? I don’t find the mongodb connector in 
> recent release version .
> 
> Many thanks
> 
> 



Re: POJO with private fields and toApeendStream of StreamTableEnvironment

2019-04-29 Thread Timo Walther

Hi Sung,

private fields are only supported if you specify getters and setters 
accordingly. Otherwise you need to use `Row.class` and perform the 
mapping in a subsequent map() function manually via reflection.


Regards,
Timo


Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
In 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset, 


POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder 
it is possible or not officially.

Any currently it does not work.

Codes:
—
CsvTableSource as = CsvTableSource.builder()
 .path("aa.csv")
 .field("name",STRING)
 .field("value",INT)
 .build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—
public class Pimplements Serializable {
 private Stringname;
 private Integervalue;
}
—

Above codes, I got below error message:
==
Exception in thread "main" org.apache.flink.table.api.TableException: 
Arity [2] of result [ArrayBuffer(String, Integer)] does not match the 
number[1] of requested type [GenericType].
at 
org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at 
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)

at ...
==

When fields of class P are changed to “public”, it works well.
—
public class Pimplements Serializable {
 public Stringname;
 public Integervalue;
}
—

Thanks,
skonmeme







Re: Version "Unknown" - Flink 1.7.0

2019-04-29 Thread Vishal Santoshi
#Generated by Git-Commit-Id-Plugin

#Wed Apr 03 22:57:42 PDT 2019

git.commit.id.abbrev=4caec0d

git.commit.user.email=aljoscha.kret...@gmail.com

git.commit.message.full=Commit for release 1.8.0\n

git.commit.id=4caec0d4bab497d7f9a8d9fec4680089117593df

git.commit.message.short=Commit for release 1.8.0

git.commit.user.name=Aljoscha Krettek

git.commit.time=03.04.2019 @ 13\:25\:54 PDT

On Mon, Apr 29, 2019 at 8:07 AM Vishal Santoshi 
wrote:

> Ok, I will check.
>
> On Fri, Apr 12, 2019, 4:47 AM Chesnay Schepler  wrote:
>
>> have you compiled Flink yourself?
>>
>> Could you check whether the flink-dist jar contains a
>> ".version.properties" file in the root directory?
>>
>> On 12/04/2019 03:42, Vishal Santoshi wrote:
>>
>> Hello ZILI,
>>   I run flink from the distribution as from
>> https://flink.apache.org/downloads.html#apache-flink-180.
>> In my case that my flink pipe is  run a job cluster on k8s.
>>
>> Regards.
>>
>>
>> On Sat, Feb 2, 2019 at 12:24 PM ZILI CHEN  wrote:
>>
>>> The version is generated in EnvironmentInformation#getVersion. As the
>>> comment stands,
>>> the version can be null(and rendered as "") if the JobManager
>>> does not run from a Maven build.
>>>
>>> Specifically Flink getVersion by "version =
>>> EnvironmentInformation.class.getPackage().getImplementationVersion();"
>>>
>>> Is it your situation?
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Vishal Santoshi  于2019年2月2日周六 下午10:27写道:
>>>
 +1 ( though testing in JOB mode  on k8s )

 On Fri, Feb 1, 2019 at 6:45 PM anaray  wrote:

> Though not a major issue. I see that Flink UI and REST api gives flink
> version as "UNKNOWN"
> I am using flink 1.7.0, with and running the cluster in JOB mode.
>
> REST endpoint /overview output
>
> {"taskmanagers":1,"slots-total":4,"slots-available":3,"jobs-running":1,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"*flink-version":"*","flink-commit":"49da9f9"}
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

>>


POJO with private fields and toApeendStream of StreamTableEnvironment

2019-04-29 Thread Sung Gon Yi
In 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
 
,
POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder it is 
possible or not officially. 
Any currently it does not work.

Codes:
—
CsvTableSource as = CsvTableSource.builder()
.path("aa.csv")
.field("name", STRING)
.field("value", INT)
.build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—
public class P implements Serializable {
private String name;
private Integer value;
}
—

Above codes, I got below error message:
==
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] 
of result [ArrayBuffer(String, Integer)] does not match the number[1] of 
requested type [GenericType].
at 
org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at 
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at ...
==

When fields of class P are changed to “public”, it works well.
—
public class P implements Serializable {
public String name;
public Integer value;
}
—

Thanks,
skonmeme





Re: Data Locality in Flink

2019-04-29 Thread Flavio Pompermaier
Thanks Fabian, that's more clear..many times you don't know when to
rebalance or not a dataset because it depends on the specific use case and
dataset distribution.
An automatic way of choosing whether a Dataset could benefit from a
rebalance or not could be VERY nice (at least for batch) but I fear this
would be very hard to implement..am I wrong?

On Mon, Apr 29, 2019 at 3:10 PM Fabian Hueske  wrote:

> Hi Flavio,
>
> These typos of race conditions are not failure cases, so no exception is
> thrown.
> It only means that a single source tasks reads all (or most of the) splits
> and no splits are left for the other tasks.
> This can be a problem if a record represents a large amount of IO or an
> intensive computation as they might not be properly distributed. In that
> case you'd need to manually rebalance the partitions of a DataSet.
>
> Fabian
>
> Am Mo., 29. Apr. 2019 um 14:42 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Hi Fabian, I wasn't aware that  "race-conditions may happen if your
>> splits are very small as the first data source task might rapidly request
>> and process all splits before the other source tasks do their first
>> request". What happens exactly when a race-condition arise? Is this
>> exception internally handled by Flink or not?
>>
>> On Mon, Apr 29, 2019 at 11:51 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> The method that I described in the SO answer is still implemented in
>>> Flink.
>>> Flink tries to assign splits to tasks that run on local TMs.
>>> However, files are not split per line (this would be horribly
>>> inefficient) but in larger chunks depending on the number of subtasks (and
>>> in case of HDFS the file block size).
>>>
>>> Best, Fabian
>>>
>>> Am So., 28. Apr. 2019 um 18:48 Uhr schrieb Soheil Pourbafrani <
>>> soheil.i...@gmail.com>:
>>>
 Hi

 I want to exactly how Flink read data in the both case of file in local
 filesystem and file on distributed file system?

 In reading data from local file system I guess every line of the file
 will be read by a slot (according to the job parallelism) for applying the
 map logic.

 In reading from HDFS I read this
  answer by Fabian Hueske
  and i want to
 know is that still the Flink strategy fro reading from distributed system
 file?

 thanks

>>>
>>
>>


RE: kafka partitions, data locality

2019-04-29 Thread Smirnov Sergey Vladimirovich (39833)
Hi Stefan,

Thnx for clarify!
But still it remains an open question for me because we use keyBy method and I 
did not found any public interface of keys reassignment (smth like 
partionCustom for DataStream).
As I heard, there is some internal mechanism with key groups and mapping key to 
groups. Is it supposed to become public?


Regards,
Sergey

From: Stefan Richter [mailto:s.rich...@ververica.com]
Sent: Friday, April 26, 2019 11:15 AM
To: Smirnov Sergey Vladimirovich (39833) 
Cc: Dawid Wysakowicz ; Ken Krugler 
; user@flink.apache.org; d...@flink.apache.org
Subject: Re: kafka partitions, data locality

Hi Sergey,

The point why this I flagged as beta is actually less about stability but more 
about the fact that this is supposed to be more of a "power user" feature 
because bad things can happen if your data is not 100% correctly partitioned in 
the same way as Flink would partition it. This is why typically you should only 
use it if the data was partitioned by Flink and you are very sure what your are 
doing, because the is not really something we can to at the API level to 
protect you from mistakes in using this feature. Eventually some runtime 
exceptions might show you that something is going wrong, but that is not 
exactly a good user experience.

On a different note, there actually is currently one open issue [1] to be aware 
of in connection with this feature and operator chaining, but at the same time 
this is something that should not hard to fix in for the next minor release.

Best,
Stefan

[1] 
https://issues.apache.org/jira/browse/FLINK-12296?focusedCommentId=16824945=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16824945


On 26. Apr 2019, at 09:48, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hi,

Dawid, great, thanks!
Any plans to make it stable? 1.9?


Regards,
Sergey

From: Dawid Wysakowicz [mailto:dwysakow...@apache.org]
Sent: Thursday, April 25, 2019 10:54 AM
To: Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>>; Ken Krugler 
mailto:kkrugler_li...@transpac.com>>
Cc: user@flink.apache.org; 
d...@flink.apache.org
Subject: Re: kafka partitions, data locality

Hi Smirnov,
Actually there is a way to tell Flink that data is already partitioned. You can 
try the reinterpretAsKeyedStream[1] method. I must warn you though this is an 
experimental feature.
Best,
Dawid
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 

Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per 

Re: Data Locality in Flink

2019-04-29 Thread Fabian Hueske
Hi Flavio,

These typos of race conditions are not failure cases, so no exception is
thrown.
It only means that a single source tasks reads all (or most of the) splits
and no splits are left for the other tasks.
This can be a problem if a record represents a large amount of IO or an
intensive computation as they might not be properly distributed. In that
case you'd need to manually rebalance the partitions of a DataSet.

Fabian

Am Mo., 29. Apr. 2019 um 14:42 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Hi Fabian, I wasn't aware that  "race-conditions may happen if your splits
> are very small as the first data source task might rapidly request and
> process all splits before the other source tasks do their first request".
> What happens exactly when a race-condition arise? Is this exception
> internally handled by Flink or not?
>
> On Mon, Apr 29, 2019 at 11:51 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> The method that I described in the SO answer is still implemented in
>> Flink.
>> Flink tries to assign splits to tasks that run on local TMs.
>> However, files are not split per line (this would be horribly
>> inefficient) but in larger chunks depending on the number of subtasks (and
>> in case of HDFS the file block size).
>>
>> Best, Fabian
>>
>> Am So., 28. Apr. 2019 um 18:48 Uhr schrieb Soheil Pourbafrani <
>> soheil.i...@gmail.com>:
>>
>>> Hi
>>>
>>> I want to exactly how Flink read data in the both case of file in local
>>> filesystem and file on distributed file system?
>>>
>>> In reading data from local file system I guess every line of the file
>>> will be read by a slot (according to the job parallelism) for applying the
>>> map logic.
>>>
>>> In reading from HDFS I read this
>>>  answer by Fabian Hueske
>>>  and i want to
>>> know is that still the Flink strategy fro reading from distributed system
>>> file?
>>>
>>> thanks
>>>
>>
>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Wouter Zorgdrager
Yes, that is correct. This is a really basic implementation that doesn't
take parallelism into account. I think you need something like this [1] to
get that working.

[1]:
https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/#dbcmd.parallelCollectionScan

Op ma 29 apr. 2019 om 14:37 schreef Flavio Pompermaier :

> But what about parallelism with this implementation? From what I see
> there's only a single thread querying Mongo and fetching all the data..am I
> wrong?
>
> On Mon, Apr 29, 2019 at 2:05 PM Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl> wrote:
>
>> For a framework I'm working on, we actually implemented a (basic) Mongo
>> source [1]. It's written in Scala and uses Json4s [2] to parse the data
>> into a case class. It uses a Mongo observer to iterate over a collection
>> and emit it into a Flink context.
>>
>> Cheers,
>> Wouter
>>
>> [1]:
>> https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala
>>
>> [2]: http://json4s.org/
>>
>> Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> I'm not aware of an official source/sink..if you want you could try to
>>> exploit the Mongo HadoopInputFormat as in [1].
>>> The provided link use a pretty old version of Flink but it should not be
>>> a big problem to update the maven dependencies and the code to a newer
>>> version.
>>>
>>> Best,
>>> Flavio
>>>
>>> [1] https://github.com/okkam-it/flink-mongodb-test
>>>
>>> On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:
>>>
 Hi,


 Can anyone give me a clue about how to read mongodb’s data as a
 batch/streaming datasource in Flink? I don’t find the mongodb connector in
 recent release version .


 Many thanks

>>>
>>>
>


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-29 Thread M Singh
 Sounds great Fabian.  

I was just trying to see if I can use higher level datastream apis.  

I appreciate your advice and help.  

Mans

On Monday, April 29, 2019, 5:41:36 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi Mans,
I don't know if that would work or not. Would need to dig into the source code 
for that. 

TBH, I would recommend to check if you can implement the logic using a 
(Keyed-)ProcessFunction.IMO, process functions are a lot easier to reason about 
than Flink's windowing framework. 
You can manage state and timer all by yourself and make sure everything is 
properly cleaned up.

Best,Fabian


Am So., 28. Apr. 2019 um 16:31 Uhr schrieb M Singh :

 Thanks Sameer/Rong:
As Fabian and you have mentioned, the window still sticks around forever for 
global window, so I am trying avoid that scenario.
Fabian & Flink team - do you have any insights into what would happen if I 
create a window and the later change it's end time during the stream processing 
?  Would it mess up any internal state/processing that uses the end time when 
the window was first created ?  If there is any other consideration to keep in 
mind, please let me know.
Thanks again.

On Wednesday, April 24, 2019, 1:29:18 PM EDT, Rong Rong 
 wrote:  
 
 Hi Mans,
Sameer is correct. if you would like to control window triggering based on 
other elements that does not belong to this window (in a keyed stream context) 
then this is probably the best way to approach. 
I think you've also posted in another thread that describes what will be left 
after fire-and-purge [1]. As Fabian stated: the only thing that might've left 
after is the window (which is the 2 long values indicate the start/end) and the 
trigger object. But you are right it might eventually filled up memory.
Another approach is to implement your own operator that handles all these 
internally by your user code. This would require you to replicate many of the 
window operator logic though.
Thanks,Rong
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212
On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:

Global Windows is fine for this use case. I have used the same strategy. You 
just define custom evictors and triggers and you are all good. Windows are 
managed by keys, so as such as long as events are evicted from the window, that 
counts towards reclaiming memory for the key+window combination. Plus there is 
just window per key with Global Windows. 
On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:

 Hi Rong:
Thanks for your answer.
>From what I understand the dynamic gap session windows are also created when 
>the event is encountered.  I need to be able to change the window end time at 
>a later time based on what other events are in that window.  One way to do 
>this is to use GlobalWindows but then these are never deleted.

Regarding CEP option - I believe that CEP patterns cannot be changed 
dynamically once they've been complied which limits it usage.
 Please feel free to correct me. 

Thanks for your help and pointers.

On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong  
wrote:  
 
 Hi Mans,
I am not sure what you meant by "dynamically change the end-time of a window. 
If you are referring to dynamically determines the firing time of the window, 
then it fits into the description of session window [1]: If you want to handle 
window end time dynamically, one way of which I can think of is the dynamic 
gap, session window [1] approach. with which you can specify the end-time of a 
window based on input elements. Provided that you are maintaining a session 
window. Another way to look at it is through the Flink-CEP library [2]. 
Thanks,Rong

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans

  

  
  

Re: Data Locality in Flink

2019-04-29 Thread Flavio Pompermaier
Hi Fabian, I wasn't aware that  "race-conditions may happen if your splits
are very small as the first data source task might rapidly request and
process all splits before the other source tasks do their first request".
What happens exactly when a race-condition arise? Is this exception
internally handled by Flink or not?

On Mon, Apr 29, 2019 at 11:51 AM Fabian Hueske  wrote:

> Hi,
>
> The method that I described in the SO answer is still implemented in Flink.
> Flink tries to assign splits to tasks that run on local TMs.
> However, files are not split per line (this would be horribly inefficient)
> but in larger chunks depending on the number of subtasks (and in case of
> HDFS the file block size).
>
> Best, Fabian
>
> Am So., 28. Apr. 2019 um 18:48 Uhr schrieb Soheil Pourbafrani <
> soheil.i...@gmail.com>:
>
>> Hi
>>
>> I want to exactly how Flink read data in the both case of file in local
>> filesystem and file on distributed file system?
>>
>> In reading data from local file system I guess every line of the file
>> will be read by a slot (according to the job parallelism) for applying the
>> map logic.
>>
>> In reading from HDFS I read this
>>  answer by Fabian Hueske
>>  and i want to
>> know is that still the Flink strategy fro reading from distributed system
>> file?
>>
>> thanks
>>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Flavio Pompermaier
But what about parallelism with this implementation? From what I see
there's only a single thread querying Mongo and fetching all the data..am I
wrong?

On Mon, Apr 29, 2019 at 2:05 PM Wouter Zorgdrager 
wrote:

> For a framework I'm working on, we actually implemented a (basic) Mongo
> source [1]. It's written in Scala and uses Json4s [2] to parse the data
> into a case class. It uses a Mongo observer to iterate over a collection
> and emit it into a Flink context.
>
> Cheers,
> Wouter
>
> [1]:
> https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala
>
> [2]: http://json4s.org/
>
> Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> I'm not aware of an official source/sink..if you want you could try to
>> exploit the Mongo HadoopInputFormat as in [1].
>> The provided link use a pretty old version of Flink but it should not be
>> a big problem to update the maven dependencies and the code to a newer
>> version.
>>
>> Best,
>> Flavio
>>
>> [1] https://github.com/okkam-it/flink-mongodb-test
>>
>> On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:
>>
>>> Hi,
>>>
>>>
>>> Can anyone give me a clue about how to read mongodb’s data as a
>>> batch/streaming datasource in Flink? I don’t find the mongodb connector in
>>> recent release version .
>>>
>>>
>>> Many thanks
>>>
>>
>>


Re: Read mongo datasource in Flink

2019-04-29 Thread Hai
Thanks for your sharing ~ That’s great !




Original Message
Sender:Wouter zorgdragerw.d.zorgdra...@tudelft.nl
Recipient:hai...@magicsoho.com
Cc:useru...@flink.apache.org
Date:Monday, Apr 29, 2019 20:05
Subject:Re: Read mongo datasource in Flink


For a framework I'm working on, we actually implemented a (basic) Mongo source 
[1]. It's written in Scala and uses Json4s [2] to parse the data into a case 
class. It uses a Mongo observer to iterate over a collection and emit it into a 
Flink context.


Cheers,
Wouter



[1]:https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala
[2]:http://json4s.org/


Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier pomperma...@okkam.it:

I'm not aware of an official source/sink..if you want you could try to exploit 
the Mongo HadoopInputFormat as in [1].
The provided link use a pretty old version of Flink but it should not be a big 
problem to update the maven dependencies and the code to a newer version.


Best,
Flavio



[1]https://github.com/okkam-it/flink-mongodb-test


On Mon, Apr 29, 2019 at 6:15 AM Hai h...@magicsoho.com wrote:

Hi,


Can anyone give me a clue about how to read mongodb’s data as a batch/streaming 
datasource in Flink? I don’t find the mongodb connector in recent release 
version .


Many thanks

Re: Read mongo datasource in Flink

2019-04-29 Thread Wouter Zorgdrager
For a framework I'm working on, we actually implemented a (basic) Mongo
source [1]. It's written in Scala and uses Json4s [2] to parse the data
into a case class. It uses a Mongo observer to iterate over a collection
and emit it into a Flink context.

Cheers,
Wouter

[1]:
https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala

[2]: http://json4s.org/

Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier :

> I'm not aware of an official source/sink..if you want you could try to
> exploit the Mongo HadoopInputFormat as in [1].
> The provided link use a pretty old version of Flink but it should not be a
> big problem to update the maven dependencies and the code to a newer
> version.
>
> Best,
> Flavio
>
> [1] https://github.com/okkam-it/flink-mongodb-test
>
> On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:
>
>> Hi,
>>
>>
>> Can anyone give me a clue about how to read mongodb’s data as a
>> batch/streaming datasource in Flink? I don’t find the mongodb connector in
>> recent release version .
>>
>>
>> Many thanks
>>
>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Hai
Hi, Flavio:


That’s good, Thank you. I will try it later ~


Regards


Original Message
Sender:Flavio pompermaierpomperma...@okkam.it
Recipient:hai...@magicsoho.com
Cc:useru...@flink.apache.org
Date:Monday, Apr 29, 2019 19:56
Subject:Re: Read mongo datasource in Flink


I'm not aware of an official source/sink..if you want you could try to exploit 
the Mongo HadoopInputFormat as in [1].
The provided link use a pretty old version of Flink but it should not be a big 
problem to update the maven dependencies and the code to a newer version.


Best,
Flavio



[1]https://github.com/okkam-it/flink-mongodb-test


On Mon, Apr 29, 2019 at 6:15 AM Hai h...@magicsoho.com wrote:

Hi,


Can anyone give me a clue about how to read mongodb’s data as a batch/streaming 
datasource in Flink? I don’t find the mongodb connector in recent release 
version .


Many thanks

Re: Version "Unknown" - Flink 1.7.0

2019-04-29 Thread Vishal Santoshi
Ok, I will check.

On Fri, Apr 12, 2019, 4:47 AM Chesnay Schepler  wrote:

> have you compiled Flink yourself?
>
> Could you check whether the flink-dist jar contains a
> ".version.properties" file in the root directory?
>
> On 12/04/2019 03:42, Vishal Santoshi wrote:
>
> Hello ZILI,
>   I run flink from the distribution as from
> https://flink.apache.org/downloads.html#apache-flink-180.
> In my case that my flink pipe is  run a job cluster on k8s.
>
> Regards.
>
>
> On Sat, Feb 2, 2019 at 12:24 PM ZILI CHEN  wrote:
>
>> The version is generated in EnvironmentInformation#getVersion. As the
>> comment stands,
>> the version can be null(and rendered as "") if the JobManager
>> does not run from a Maven build.
>>
>> Specifically Flink getVersion by "version =
>> EnvironmentInformation.class.getPackage().getImplementationVersion();"
>>
>> Is it your situation?
>>
>> Best,
>> tison.
>>
>>
>> Vishal Santoshi  于2019年2月2日周六 下午10:27写道:
>>
>>> +1 ( though testing in JOB mode  on k8s )
>>>
>>> On Fri, Feb 1, 2019 at 6:45 PM anaray  wrote:
>>>
 Though not a major issue. I see that Flink UI and REST api gives flink
 version as "UNKNOWN"
 I am using flink 1.7.0, with and running the cluster in JOB mode.

 REST endpoint /overview output

 {"taskmanagers":1,"slots-total":4,"slots-available":3,"jobs-running":1,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"*flink-version":"*","flink-commit":"49da9f9"}







 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Flavio Pompermaier
I'm not aware of an official source/sink..if you want you could try to
exploit the Mongo HadoopInputFormat as in [1].
The provided link use a pretty old version of Flink but it should not be a
big problem to update the maven dependencies and the code to a newer
version.

Best,
Flavio

[1] https://github.com/okkam-it/flink-mongodb-test

On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:

> Hi,
>
>
> Can anyone give me a clue about how to read mongodb’s data as a
> batch/streaming datasource in Flink? I don’t find the mongodb connector in
> recent release version .
>
>
> Many thanks
>


Re: Emitting current state to a sink

2019-04-29 Thread Fabian Hueske
Nice!
Thanks for the confirmation :-)

Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi :

> Thanks! Works like a charm :)
>
> On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> I'm not sure if  you cannot emit data from the keyed state when you
>> receive a broadcasted message.
>> The Context parameter of the processBroadcastElement() method in the
>> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
>> The method takes a KeyedStateFunction that is applied to each key of a
>> state, but does not provide a Collector to emit data.
>> Maybe you can pass the collector to the KeyedStateFunction and emit
>> records while it iterates over the key space.
>>
>> Best, Fabian
>>
>> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
>> avi.l...@bluevoyant.com>:
>>
>>> Hi Timo,
>>> I defiantly did. but broadcasting a command and trying to address the
>>> persisted state (I mean the state of the data stream and not the
>>> broadcasted one) you get the exception that I wrote
>>> (java.lang.NullPointerException: No key set. This method should not be
>>> called outside of a keyed context). e.g doing something like
>>>
>>> override def processBroadcastElement(value: BroadcastRequest, ctx: 
>>> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
>>> Response]#Context, out: Collector[Response]): Unit = {
>>>   value match {
>>> case Command(StateCmd.Fetch, _) =>
>>>   if (state.value() != null) {
>>> ouout.collecy(state.value())
>>>   }
>>>
>>> will yield that exception
>>>
>>> BR
>>> Avi
>>>
>>> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther 
>>> wrote:
>>>
 This Message originated outside your organization.

 Hi Avi,

 did you have a look at the .connect() and .broadcast() API
 functionalities? They allow you to broadcast a control stream to all
 operators. Maybe this example [1] or other examples in this repository
 can help you.

 Regards,
 Timo

 [1]

 https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java

 Am 26.04.19 um 07:57 schrieb Avi Levi:
 > Hi,
 > We have a keyed pipeline with persisted state.
 > Is there a way to broadcast a command and collect all values that
 > persisted in  the state ?
 >
 > The end result can be for example sending a fetch command to all
 > operators and emitting the results to some sink
 >
 > why do we need it ? from time to time we might want to check if we
 are
 > missing keys what are the additional keys or simply emit the current
 > state to a table and to query it.
 >
 > I tried simply broadcasting a command and addressing the persisted
 > state but that resulted with:
 > java.lang.NullPointerException: No key set. This method should not be
 > called outside of a keyed context.
 >
 > is there a good way to achieve that ?
 >
 > Cheers
 > Avi




Re: Emitting current state to a sink

2019-04-29 Thread Avi Levi
Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> I'm not sure if  you cannot emit data from the keyed state when you
> receive a broadcasted message.
> The Context parameter of the processBroadcastElement() method in the
> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
> The method takes a KeyedStateFunction that is applied to each key of a
> state, but does not provide a Collector to emit data.
> Maybe you can pass the collector to the KeyedStateFunction and emit
> records while it iterates over the key space.
>
> Best, Fabian
>
> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
>> Hi Timo,
>> I defiantly did. but broadcasting a command and trying to address the
>> persisted state (I mean the state of the data stream and not the
>> broadcasted one) you get the exception that I wrote
>> (java.lang.NullPointerException: No key set. This method should not be
>> called outside of a keyed context). e.g doing something like
>>
>> override def processBroadcastElement(value: BroadcastRequest, ctx: 
>> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
>> Response]#Context, out: Collector[Response]): Unit = {
>>   value match {
>> case Command(StateCmd.Fetch, _) =>
>>   if (state.value() != null) {
>> ouout.collecy(state.value())
>>   }
>>
>> will yield that exception
>>
>> BR
>> Avi
>>
>> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:
>>
>>> This Message originated outside your organization.
>>>
>>> Hi Avi,
>>>
>>> did you have a look at the .connect() and .broadcast() API
>>> functionalities? They allow you to broadcast a control stream to all
>>> operators. Maybe this example [1] or other examples in this repository
>>> can help you.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>>
>>> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
>>> 
>>>
>>> Am 26.04.19 um 07:57 schrieb Avi Levi:
>>> > Hi,
>>> > We have a keyed pipeline with persisted state.
>>> > Is there a way to broadcast a command and collect all values that
>>> > persisted in  the state ?
>>> >
>>> > The end result can be for example sending a fetch command to all
>>> > operators and emitting the results to some sink
>>> >
>>> > why do we need it ? from time to time we might want to check if we are
>>> > missing keys what are the additional keys or simply emit the current
>>> > state to a table and to query it.
>>> >
>>> > I tried simply broadcasting a command and addressing the persisted
>>> > state but that resulted with:
>>> > java.lang.NullPointerException: No key set. This method should not be
>>> > called outside of a keyed context.
>>> >
>>> > is there a good way to achieve that ?
>>> >
>>> > Cheers
>>> > Avi
>>>
>>>


Re:会话窗口关闭时间的问题

2019-04-29 Thread 邵志鹏
您好,下面是个人理解:


首先,这个问题不是Session窗口的问题,滚动和滑动是一样的。


时间窗口的计算输出是由时间特性确定的,目前
1. 只有processing-time即处理时间(没有水位线处理乱序)能够满足及时输出窗口的结果。
2. 
把eventtime的水位线时间戳设置为System.currentTimeMillis()也是可以及时输出窗口的结果,但是只会消费Flink程序启动后接收到的新消息,之前的消息是处理不到的,即便是新的消费者组group.id和earliest也无效【意思就是容错和重播失效,当然还可以再反复验证】。


目前EventTime-事件时间做到实时正确性的前提:数据的事件时间间隔小,或者小于窗口时间间隔就可以了,保证数据流不中断,这样就把不及时输出窗口的时间点无限推到无穷大的未来,即程序最终崩溃或者下线那一刻。


水位线是用来处理事件乱序的,水位线的增长依赖数据的输入,这个是很明显的咯,assignTimestampsAndWatermarks的时候根据事件时间推算的嘛,而且还会减掉一点时间,就是多掳一点数据,所以数据中断了,就是水位线停止增长了。


然后再来看,事件时间窗口默认使用的窗口触发源码:
onElement和onEventTime时才有机会TriggerResult.FIRE;
onElement时会判断水位线。


onEventTime时会根据水位线设置的时间戳定时器进行时间比较。
onEventTime往上找会找到InternalTimerServiceImpl#advanceWatermark 
再往上找会到AbstractStreamOperator#processWatermark,
也就是和新的数据进来有关。


结论就是,如果当前事件时间窗口的end时间还没到,然而水位线是小于这个end时间的,如果处理乱序的间隔比较大,甚至会有多个窗口的end时间都大于最近的水位线时间戳,那不就是把窗口往后退了嘛...只有更后面的数据到来,新的水位线增长上去,前面滞留的窗口数据才有机会输出。


所以我的想法是,在每一个时间窗口上面加上一个判断,只要当前窗口未关闭未触发,窗口的end时间大于或等于自然时间点就触发【保证只触发一次就好】,不需要等到下一次水位线增长。


另外,目前的事件时间是符合自然的实时流数据语义的,可是,业务数据有时候间隔还是蛮大的,毕竟有一些阶段数据比较密集,有一些阶段数据比较稀疏。


以上为个人理解,也遇到同样的问题,甚至认为事件时间在Flink这里毫无意义,如有哪里不对的地方,做梦都想肯定是哪里不对,欢迎讨论,如果真的不对,希望能给出正确的demo,这样就可以完美的用于生产了。


还有就是我默认为,窗口是根据事件已经确定好了的:
时间窗口的生成:


模板方法-处理水位线:AbstractStreamOperator#processWatermark


InternalTimerServiceImpl#advanceWatermark


默认的事件时间触发器:





在 2019-04-29 18:06:30,by1507...@buaa.edu.cn 写道:


各位大神,你们好:

   
最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

  FlinkKafkaConsumer010 kafkaSource = new 
FlinkKafkaConsumer010<>("rfid-input-topic",

new RfidRawDataSchema(), props);

  kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

  DataStream dataStream = env.addSource(kafkaSource);

 

  // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写的是窗口融合的方法

  DataStream outputStream = dataStream.keyBy("uniqueId")

  .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

  RfidReduceFunction());

  

  //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new 
RfidRawDataSchema(), props));

 

  try {

 env.execute("Flink add data source");

  } catch (Exception e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

  }

??????????event time??????????processing time??????????????

2019-04-29 Thread ??????
??flink Streaming-Event time-Overview
??processing time / event time / ingestion time
?? event time ??
Note that sometimes when event time programs are processing live data in 
real-time, they will use some processing timeoperations in order to guarantee 
that they are progressing in a timely fashion.??event 
time?? processing time 
??1.??2.3.??

会话窗口关闭时间的问题

2019-04-29 Thread by1507118
各位大神,你们好:

   最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束
窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束
时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

  FlinkKafkaConsumer010 kafkaSource = new
FlinkKafkaConsumer010<>("rfid-input-topic",

new RfidRawDataSchema(), props);

  kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

  DataStream dataStream = env.addSource(kafkaSource);

 

  // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写
的是窗口融合的方法

  DataStream outputStream = dataStream.keyBy("uniqueId")

  .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

  RfidReduceFunction());

   

  //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new
RfidRawDataSchema(), props));

 

  try {

 env.execute("Flink add data source");

  } catch (Exception e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

  }



Re: Data Locality in Flink

2019-04-29 Thread Fabian Hueske
Hi,

The method that I described in the SO answer is still implemented in Flink.
Flink tries to assign splits to tasks that run on local TMs.
However, files are not split per line (this would be horribly inefficient)
but in larger chunks depending on the number of subtasks (and in case of
HDFS the file block size).

Best, Fabian

Am So., 28. Apr. 2019 um 18:48 Uhr schrieb Soheil Pourbafrani <
soheil.i...@gmail.com>:

> Hi
>
> I want to exactly how Flink read data in the both case of file in local
> filesystem and file on distributed file system?
>
> In reading data from local file system I guess every line of the file will
> be read by a slot (according to the job parallelism) for applying the map
> logic.
>
> In reading from HDFS I read this
>  answer by Fabian Hueske
>  and i want to
> know is that still the Flink strategy fro reading from distributed system
> file?
>
> thanks
>


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-29 Thread Fabian Hueske
Hi Mans,

I don't know if that would work or not. Would need to dig into the source
code for that.

TBH, I would recommend to check if you can implement the logic using a
(Keyed-)ProcessFunction.
IMO, process functions are a lot easier to reason about than Flink's
windowing framework.
You can manage state and timer all by yourself and make sure everything is
properly cleaned up.

Best,
Fabian


Am So., 28. Apr. 2019 um 16:31 Uhr schrieb M Singh :

> Thanks Sameer/Rong:
>
> As Fabian and you have mentioned, the window still sticks around forever
> for global window, so I am trying avoid that scenario.
>
> Fabian & Flink team - do you have any insights into what would happen if I
> create a window and the later change it's end time during the stream
> processing ?  Would it mess up any internal state/processing that uses the
> end time when the window was first created ?  If there is any other
> consideration to keep in mind, please let me know.
>
> Thanks again.
>
> On Wednesday, April 24, 2019, 1:29:18 PM EDT, Rong Rong <
> walter...@gmail.com> wrote:
>
>
> Hi Mans,
>
> Sameer is correct. if you would like to control window triggering based on
> other elements that does not belong to this window (in a keyed stream
> context) then this is probably the best way to approach.
>
> I think you've also posted in another thread that describes what will be
> left after fire-and-purge [1]. As Fabian stated: the only thing that
> might've left after is the window (which is the 2 long values indicate the
> start/end) and the trigger object. But you are right it might eventually
> filled up memory.
>
> Another approach is to implement your own operator that handles all these
> internally by your user code. This would require you to replicate many of
> the window operator logic though.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212
>
> On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:
>
> Global Windows is fine for this use case. I have used the same strategy.
> You just define custom evictors and triggers and you are all good. Windows
> are managed by keys, so as such as long as events are evicted from the
> window, that counts towards reclaiming memory for the key+window
> combination. Plus there is just window per key with Global Windows.
>
> On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:
>
> Hi Rong:
>
> Thanks for your answer.
>
> From what I understand the dynamic gap session windows are also created
> when the event is encountered.  I need to be able to change the window end
> time at a later time based on what other events are in that window.  One
> way to do this is to use GlobalWindows but then these are never deleted.
>
> Regarding CEP option - I believe that CEP patterns cannot be changed
> dynamically once they've been complied which limits it usage.
>
> Please feel free to correct me.
>
> Thanks for your help and pointers.
>
> On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong 
> wrote:
>
>
> Hi Mans,
>
> I am not sure what you meant by "dynamically change the end-time of a
> window. If you are referring to dynamically determines the firing time of
> the window, then it fits into the description of session window [1]:
> If you want to handle window end time dynamically, one way of which I can
> think of is the dynamic gap, session window [1] approach. with which you
> can specify the end-time of a window based on input elements. Provided that
> you are maintaining a session window.
> Another way to look at it is through the Flink-CEP library [2].
>
> Thanks,
> Rong
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
>
> On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:
>
> Hi:
>
> I am working on a project and need to change the end time of the window
> dynamically.  I want to find out if the end time of the window is used
> internally (for sorting windows/etc) except for handling watermarks that
> would cause problems if the end time was changed during run time after the
> window has been created even if no new event has arrived for that window.
>
> I don't want to use GlobalWindow since from my understanding it never gets
> destroyed.
>
> If there is any alternate way of dealing with this, please let me know.
>
> Thanks
>
> Mans
>
>


Re: FileInputFormat that processes files in chronological order

2019-04-29 Thread Fabian Hueske
Hi Sergei,

It depends whether you want to process the file with the DataSet (batch) or
DataStream (stream) API.
Averell's answer was addressing the DataStream API part.

The DataSet API does not have any built-in support to distinguish files (or
file splits) by folders and process them in order.
For the DataSet API, you would need to implement a custom InputFormat
(based on FileInputFormat) with a custom InputSplitAssigner implementations.
The InputSplitAssigner would need to assign splits to hosts based on their
path and in the correct order.

Best,
Fabian

Am So., 28. Apr. 2019 um 08:48 Uhr schrieb Averell :

> Hi,
>
> Regarding splitting by shards, I believe that you can simply create two
> sources, one for each shard. After that, union them together.
>
> Regarding processing files in chronological order, Flink currently reads
> files using the files' last-modified-time order (i.e. oldest files will be
> processed first). So if your file1.json is older than file2, file2 is older
> than file3, then you don't need to do anything.
> If your file-times are not in that order, then I think its more complex.
> But
> I am curious about why there are such requirements first. Is this a
> streaming problem?
>
> I don't think FileInputFormat has anything to do here. Use that when your
> files are in a format not currently supported by Flink.
>
> Regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Emitting current state to a sink

2019-04-29 Thread Fabian Hueske
Hi Avi,

I'm not sure if  you cannot emit data from the keyed state when you receive
a broadcasted message.
The Context parameter of the processBroadcastElement() method in the
KeyedBroadcastProcessFunction has the applyToKeyedState() method.
The method takes a KeyedStateFunction that is applied to each key of a
state, but does not provide a Collector to emit data.
Maybe you can pass the collector to the KeyedStateFunction and emit records
while it iterates over the key space.

Best, Fabian

Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi :

> Hi Timo,
> I defiantly did. but broadcasting a command and trying to address the
> persisted state (I mean the state of the data stream and not the
> broadcasted one) you get the exception that I wrote
> (java.lang.NullPointerException: No key set. This method should not be
> called outside of a keyed context). e.g doing something like
>
> override def processBroadcastElement(value: BroadcastRequest, ctx: 
> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
> Response]#Context, out: Collector[Response]): Unit = {
>   value match {
> case Command(StateCmd.Fetch, _) =>
>   if (state.value() != null) {
> ouout.collecy(state.value())
>   }
>
> will yield that exception
>
> BR
> Avi
>
> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:
>
>> This Message originated outside your organization.
>>
>> Hi Avi,
>>
>> did you have a look at the .connect() and .broadcast() API
>> functionalities? They allow you to broadcast a control stream to all
>> operators. Maybe this example [1] or other examples in this repository
>> can help you.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
>>
>> Am 26.04.19 um 07:57 schrieb Avi Levi:
>> > Hi,
>> > We have a keyed pipeline with persisted state.
>> > Is there a way to broadcast a command and collect all values that
>> > persisted in  the state ?
>> >
>> > The end result can be for example sending a fetch command to all
>> > operators and emitting the results to some sink
>> >
>> > why do we need it ? from time to time we might want to check if we are
>> > missing keys what are the additional keys or simply emit the current
>> > state to a table and to query it.
>> >
>> > I tried simply broadcasting a command and addressing the persisted
>> > state but that resulted with:
>> > java.lang.NullPointerException: No key set. This method should not be
>> > called outside of a keyed context.
>> >
>> > is there a good way to achieve that ?
>> >
>> > Cheers
>> > Avi
>>
>>


Re: Working around lack of SQL triggers

2019-04-29 Thread Fabian Hueske
Hi,

I don't think that (the current state of) Flink SQL is a good fit for your
requirements.
Each query will be executed as an independent job. So there won't be any
sharing of intermediate results.
You can do some of this manually if you use the Table API, but even then it
won't allow for early results.

I'd recommend to use the DataStream API and some parameterizable
aggregation functions.

Best, Fabian

Am Fr., 26. Apr. 2019 um 20:49 Uhr schrieb deklanw :

> I'm not sure how to express my logic simply where early triggers are a
> necessity.
>
> My application has large windows (2 weeks~) where early triggering is
> absolutely required. But, also, my application has mostly relatively simple
> logic which can be expressed in SQL. There's a ton of duplication, like the
> following
>
>
> ```
> SELECT A,B,C,
> COUNT(*) FILTER (WHERE my_condition) AS total_conditions,
> COUNT(*) AS total,
> ROUND(COUNT(*) FILTER (WHERE my_condition)/(COUNT(*)), 1)
> AS
> condition_rate,
> AVG(D),
> AVG(E),
> AVG(F)
> FROM foo
> GROUP BY A,B,C, SESSION(...)
> ```
>
> Just imagine these kinds of queries duplicated a ton, just varying which
> fields are being averaged and grouped by.
>
> This is fairly easy to do with SQL, with some copying and pasting. Just
> Ctrl+Fing to give an idea (so far),
> COUNT - 50
> AVG - 27
> GROUP BY - 12
>
> Since Flink doesn't support GROUPING SETS for streaming, I'll need to
> duplicate a lot of these queries actually. So this is an underestimation.
>
> Is writing an absolute ton of custom AggregateFunction boilerplate the only
> way to solve this problem? Is there no way to abstract this while
> maintaining early triggers? I feel like I'm missing something. Is Flink SQL
> streaming only for short windows where triggering only at the end of the
> window is acceptable?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-29 Thread Fabian Hueske
Hi Juan,

count() and collect() trigger the execution of a job.
Since Flink does not cache intermediate results (yet), all operations from
the sink (count()/collect()) to the sources are executed.
So in a sense a DataSet is immutable (given that the input of the sources
do not change) but completely recomputed for every execution.

There are currently some efforts [1] on the way to improve Flink behavior
for interactive sessions.

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
[2]
https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce9831f462737f51742a52c1@%3Cdev.flink.apache.org%3E

Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Timo,
>
> Thanks for your answer. I was surprised to have problems calling those
> methods concurrently, because I though data sets were immutable. Now I
> understand calling count or collect mutates the data set, not its contents
> but some kind of execution plan included in the data set.
>
> I suggest adding a remark about this lack of thread safety to the
> documentation. Maybe it’s already there but I haven’t seen it. I also
> understand repeated calls to collect and count the safe data set are ok as
> long as they are done sequentially, and not concurrently.
>
> Thanks,
>
> Juan
>
> On Fri, Apr 26, 2019 at 02:00 Timo Walther  wrote:
>
>> Hi Juan,
>>
>> as far as I know we do not provide any concurrency guarantees for count()
>> or collect(). Those methods need to be used with caution anyways as the
>> result size must not exceed a certain threshold. I will loop in Fabian who
>> might know more about the internals of the execution?
>>
>> Regards,
>> Timo
>>
>>
>> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>>
>> Any thoughts on this?
>>
>> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a very simple program using the local execution environment, that
>>> throws NPE and other exceptions related to concurrent access when launching
>>> a count for a DataSet from different threads. The program is
>>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which
>>> is basically this:
>>>
>>> def doubleCollectConcurrent = {
>>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>>   implicit val ec = 
>>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>>
>>>   val pendingActions = Seq.fill(10)(
>>> Future { println(s"xs.count = ${xs.count}") }
>>>   )
>>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
>>> =>
>>> println("pending action finished")
>>> Unit  }
>>>   Await.result(pendingActionsFinished, 10 seconds)
>>>
>>>   ok}
>>>
>>>
>>> It looks like the issue is on OperatorTranslation.java at
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>>> when a sink is added to the sinks list while that list is being traversed.
>>> I have the impression that this is by design, so I'd like to confirm that
>>> this is the expected behaviour, and whether this is happening only for the
>>> local execution environment, or if this affects all execution environments
>>> implementations. Other related questions I have are:
>>>
>>>- Is this documented somewhere? I'm quite new to Flink, so I might
>>>have missed this. Is there any known workaround for concurrently 
>>> launching
>>>counts and other sink computations on the same DataSet?
>>>- Is it safe performing a sequence of calls to DataSet sink methods
>>>like count or collect, on the same DataSet, as long as they are performed
>>>from the same thread? From my experience it looks like it is, but I'd 
>>> like
>>>to get a confirmation if possible.
>>>
>>> This might be related to
>>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>>> but I'm not sure.
>>>
>>> Thanks a lot for your help.
>>>
>>> Greetings,
>>>
>>> Juan
>>>
>>
>>


Re:Re: How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 default environment

2019-04-29 Thread 胡逸才
Thanks Tang:
Following your prompt, I deleted the useless parameters from the command line 
and added your parameters to flink-config.xml, which has been successfully 
implemented on YARN in the JAVA 7 environment.







At 2019-04-28 11:54:18, "Yun Tang"  wrote:

Hi Zhangjun


Thanks for your reply!



However, Flink user mailing list is tracked in English and user-zh mailing list 
is specific for Chinese. Reply in Chinese in flink user mailing list would be 
somehow unfriendly for those non-Chinese speakers.



I think your reply could be translated as official requirements.[1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/projectsetup/java_api_quickstart.html#requirements



From: 126 
Sent: Sunday, April 28, 2019 8:24
To: 胡逸才
Cc: imj...@gmail.com; dev; user
Subject: Re: How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 
default environment
 
Flink源码中用到了很多java1.8的特性,所以用jdk1.7是不行的


发自我的 iPhone

在 2019年4月26日,17:48,胡逸才  写道:


At present, all YARN clusters adopt JAVA 7 environment.

While trying to use FLINK to handle the deployment of flow processing business 
scenarios, it was found that FLINK ON YARN mode always failed to perform a 
session task. The application log of YARN shows Unsupported major. minor 
version 52.0.

I tried to add env. java. home: < JDK 1.8PATH > in flink-conf. yaml of the 
mailing list solution. And the startup command adds -yD yarn. taskmanager. env. 
JAVA_HOME= < JDK1.8PATH>、-yD containerized. master. env. JAVA_HOME= < 
JDK1.8PATH>, -yD containerized. taskmanager. env. JAVA_HOME= < JDK1.8PATH>. 
Flink session cluster in YARN can not run Application in JAVA 8 environment.

So can I use Flink1.7.X submit Flink session cluster application in YARN under 
JAVA 7 environment?




 

Re: RocksDB backend with deferred writes?

2019-04-29 Thread Congxian Qiu
Hi, David

When you flush data to db, you can reference the serialize logic[1], and store 
the serialized bytes to RocksDB.

[1] 
https://github.com/apache/flink/blob/c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java#L136

Best, Congxian
On Apr 29, 2019, 12:57 +0800, aitozi , wrote:
> Hi,David
>
> Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek 
> suggested me to extends the AbstractStreamOperator to custom the operator 
> operation on state or extends the statebackend to add a cache layer on it.
>
> Fyi: 
> https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992
>
> And a new stateBackend was introduced in Flink Forward China 2018 by alibaba 
> , but it has not been open source , you can take a look on the slides.
>
> https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
>
> Thanks,
> Aitozi
>
> 发件人: "David J. C. Beach" 
> 日期: 2019年4月29日 星期一 上午11:52
> 收件人: aitozi 
> 抄送: 
> 主题: Re: RocksDB backend with deferred writes?
>
> Thanks Aitozi.
>
> Your answer makes good sense and I'm trying to implement this now.  My code 
> is written as a KeyedProcessFunction, but I can't see where this exposes the 
> KeyContext interface.  Is there anything you can point me to in the docs?
>
> Best,
> David
>
>
>
> On Sun, Apr 28, 2019 at 8:09 PM aitozi  wrote:
> > Hi,David
> >
> > RocksdbKeyedBackend is used under keyContext, every operation with state 
> > should setCurrentKey to let the rocksdb aware of the current key and 
> > complute the currrent keyGroup. Use these two parts to interactive with the 
> > underyling rocksdb.
> >
> > I think you can achieve this goal by setCurrentKey before flush to rocksdb 
> > or make the prefix key (keygroup + key) yourself put/get value to/from 
> > rocksdb.
> >
> > Thanks,
> > Aitozi
> >
> >
> > 发件人: "David J. C. Beach" 
> > 日期: 2019年4月29日 星期一 上午6:43
> > 收件人: 
> > 主题: RocksDB backend with deferred writes?
> >
> > I have a stateful operator in a task which processes thousands of elements 
> > per second (per task) when using the Filesystem backend.  As documented and 
> > reported by other users, when I switch to the RocksDB backend, throughput 
> > is considerably lower.  I need something that combines the high performance 
> > of in-memory with the large state and incremental checkpointing of RocksDB.
> >
> > For my application, I can *almost* accomplish this by including a caching 
> > layer which maintains a map of pending (key, value) writes.  These are 
> > periodically flushed to the RocksDB (and always prior to a checkpoint).  
> > This greatly reduces the number of writes to RocksDB, and means that I can 
> > get a "best of both worlds" in terms of throughput and 
> > reliability/incremental checkpointing.  These optimizations make sense for 
> > my workload, since events which operate on the same key tend to be close 
> > together in the stream.  (Over the long haul, there are many millions of 
> > keys, and occasionally, an event references some key from the distant past, 
> > hence the need for RocksDB.)
> >
> > Unfortunately, this solution does not work perfectly because when I do 
> > eventually flush writes to the underlying RocksDB backend, the stateful 
> > processor may be operating on an element which belongs to a different key 
> > group.  Hence, the elements that I flush are associated with the wrong key 
> > group, and things don't work quite right.
> >
> > Is there any way to wrap the RocksDB backend with caching and deferred 
> > writes (in a way which is "key-group aware")?
> >
> > Thanks!
> >
> > David
> >


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-29 Thread Ufuk Celebi
Actually, I couldn't even find a mention of this flag in the docs here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html

– Ufuk


On Mon, Apr 29, 2019 at 8:45 AM Ufuk Celebi  wrote:

> I didn't find this as part of the
> https://flink.apache.org/news/2019/04/09/release-1.8.0.html notes.
>
> I think an update to the Important Changes section would be valuable for
> users upgrading to 1.8 from earlier releases. Also, logging that the
> library is on the classpath but the feature flag is set to false would be a
> helpful.
>
> – Ufuk
>
>
> On Thu, Apr 25, 2019 at 4:13 PM Vishal Santoshi 
> wrote:
>
>> Ditto that, queryable-state.enable to true works.
>>
>> Thanks everyone.
>>
>> On Thu, Apr 25, 2019 at 6:28 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> As Guowei mentioned you have to enable the Queryable state. The default
>>> setting was changed in 1.8.0. There is an open JIRA[1] for changing the
>>> documentation accordingly.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-12274
>>> On 25/04/2019 03:27, Guowei Ma wrote:
>>>
>>> You could try to set queryable-state.enable to true. And check again.
>>>
>>> Vishal Santoshi 于2019年4月25日 周四上午1:40写道:
>>>
 Any one ?

 On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hello folks,
>
>  Following
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>  .
> for setting up the Queryable Server and proxy, I have my classpath ( the
> lib directory ) that has the  required jar, But I do not see the mentioned
> log and of course am not able to set up the QS server/Proxy . This has
> worked on 1.7.2 and I think I have everything as advised, see the logs
> below. I do not  see this log  "Started the Queryable State Proxy
> Server @ ...".  Any one with this issue...
>
>
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - -Dtaskmanager.numberOfTaskSlots=1
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - --configDir
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - /usr/local/flink/conf
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   -  Classpath:
> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
> */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
> :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   -
> 
>
> 2019-04-24 15:54:26,298 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Registered UNIX signal handlers for [TERM, HUP, INT]
>
> 2019-04-24 15:54:26,300 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Maximum number of open file descriptors is 65536.
>
> 2019-04-24 15:54:26,305 INFO
> org.apache.flink.configuration.GlobalConfiguration-
> Loading configuration property: state.backend.fs.checkpointdir,
> hdfs:///flink-checkpoints_k8s_test/prod
>
> 2
>
>
>
>
> --
>>> Best,
>>> Guowei
>>>
>>>


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-29 Thread Ufuk Celebi
I didn't find this as part of the
https://flink.apache.org/news/2019/04/09/release-1.8.0.html notes.

I think an update to the Important Changes section would be valuable for
users upgrading to 1.8 from earlier releases. Also, logging that the
library is on the classpath but the feature flag is set to false would be a
helpful.

– Ufuk


On Thu, Apr 25, 2019 at 4:13 PM Vishal Santoshi 
wrote:

> Ditto that, queryable-state.enable to true works.
>
> Thanks everyone.
>
> On Thu, Apr 25, 2019 at 6:28 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Vishal,
>>
>> As Guowei mentioned you have to enable the Queryable state. The default
>> setting was changed in 1.8.0. There is an open JIRA[1] for changing the
>> documentation accordingly.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12274
>> On 25/04/2019 03:27, Guowei Ma wrote:
>>
>> You could try to set queryable-state.enable to true. And check again.
>>
>> Vishal Santoshi 于2019年4月25日 周四上午1:40写道:
>>
>>> Any one ?
>>>
>>> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hello folks,

  Following
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
  .
 for setting up the Queryable Server and proxy, I have my classpath ( the
 lib directory ) that has the  required jar, But I do not see the mentioned
 log and of course am not able to set up the QS server/Proxy . This has
 worked on 1.7.2 and I think I have everything as advised, see the logs
 below. I do not  see this log  "Started the Queryable State Proxy
 Server @ ...".  Any one with this issue...



 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - -Dtaskmanager.numberOfTaskSlots=1

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - --configDir

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - /usr/local/flink/conf

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   -  Classpath:
 /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
 */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
 :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::

 2019-04-24 15:54:26,296 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   -
 

 2019-04-24 15:54:26,298 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - Registered UNIX signal handlers for [TERM, HUP, INT]

 2019-04-24 15:54:26,300 INFO  
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
   - Maximum number of open file descriptors is 65536.

 2019-04-24 15:54:26,305 INFO
 org.apache.flink.configuration.GlobalConfiguration-
 Loading configuration property: state.backend.fs.checkpointdir,
 hdfs:///flink-checkpoints_k8s_test/prod

 2




 --
>> Best,
>> Guowei
>>
>>


Re: Containers are not released after job failed

2019-04-29 Thread liujiangang
Thank you, it is fixed in the new version.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/