Re: [DISCUSS] Towards a leaner flink-dist

2019-01-27 Thread Becket Qin
Hi Chesnay,

Thanks for the proposal. +1 for make the distribution thinner.

Meanwhile, it would be useful to have all the peripheral libraries/jars
hosted somewhere so users can download them from a centralized place. We
can also encourage the community to contribute their libraries, such as
connectors and other pluggables, to the same place (maybe a separate
category), so the community can share the commonly used libraries as well.

Thanks,

Jiangjie (Becket) Qin

On Sat, Jan 26, 2019 at 2:49 PM Hequn Cheng  wrote:

> Hi Chesnay,
>
> Thanks a lot for the proposal! +1 for a leaner flink-dist and improve the
> "Download" page.
>  I think a leaner flink-dist would be very helpful. If we bundle all jars
> into a single one, this will easily cause class conflict problem.
>
> Best,
> Hequn
>
>
> On Fri, Jan 25, 2019 at 2:48 PM jincheng sun 
> wrote:
>
> > Hi Chesnay,
> >
> > Thank you for the proposal. And i like it very much.
> >
> > +1 for the leaner distribution.
> >
> > About improve the "Download" page, I think we can add the connectors
> > download link in the  "Optional components" section which @Timo Walther
> >   mentioned above.
> >
> >
> > Regards,
> > Jincheng
> >
> > Chesnay Schepler  于2019年1月18日周五 下午5:59写道:
> >
> >> Hello,
> >>
> >> the binary distribution that we release by now contains quite a lot of
> >> optional components, including various filesystems, metric reporters and
> >> libraries. Most users will only use a fraction of these, and as such
> >> pretty much only increase the size of flink-dist.
> >>
> >> With Flink growing more and more in scope I don't believe it to be
> >> feasible to ship everything we have with every distribution, and instead
> >> suggest more of a "pick-what-you-need" model, where flink-dist is rather
> >> lean and additional components are downloaded separately and added by
> >> the user.
> >>
> >> This would primarily affect the /opt directory, but could also be
> >> extended to cover flink-dist. For example, the yarn and mesos code could
> >> be spliced out into separate jars that could be added to lib manually.
> >>
> >> Let me know what you think.
> >>
> >> Regards,
> >>
> >> Chesnay
> >>
> >>
>


Re: TimeZone shift problem in Flink SQL

2019-01-27 Thread 徐涛
Hi Rongrong,
The event is really happens in Tuesday, January 22, 2019 9:03:02.001 
PM, so I think the first function returns 1548162182001 is correct. It is the 
Unix epoch time when the event happens.
But why the timestamp passed into the from_unixtime is changed to 
1548190982001? If it is not changed, I can still format 1548162182001 then pass 
a time zone to get the actual date. 
Timestamp is a time-zone independent value, it should not be changed, I 
think.

Best
Henry

> 在 2019年1月26日,下午1:21,Rong Rong  写道:
> 
> Hi Henry,
> 
> Unix epoch time values are always under GMT timezone, for example:
> - 1548162182001 <=> GMT: Tuesday, January 22, 2019 1:03:02.001 PM, or CST: 
> Tuesday, January 22, 2019 9:03:02.001 PM.
> - 1548190982001 <=> GMT: Tuesday, January 22, 2019 9:03:02.001 PM, or CST: 
> Wednesday, January 23, 2019 4:03:02.001 AM.
> 
> several things are needed here 
> 1. your "unix_timestamp" UDF should return actual Unix epoch time [1].
> 2. as Bowen mentioned, you will have to pass in the desired timezone as 
> argument to your "from_unixtime" UDF.
> 
> --
> Rong
> 
> [1]: https://en.wikipedia.org/wiki/Unix_time 
> 
> On Thu, Jan 24, 2019 at 4:43 PM Bowen Li  > wrote:
> Hi,
> 
> Did you consider timezone in conversion in your UDF?
> 
> 
> On Tue, Jan 22, 2019 at 5:29 AM 徐涛  > wrote:
> Hi Experts,
>   I have the following two UDFs,
> unix_timestamp:   transform from string to Timestamp, with the 
> arguments (value:String, format:String), return Timestamp
>from_unixtime:transform from Timestamp to String, with the 
> arguments (ts:Long, format:String), return String
> 
> 
>   select 
>  number,
>  ts,
>  from_unixtime(unix_timestamp(LAST_UPDATE_TIME, 'EEE MMM dd HH:mm:Ss 
> z '),'-MM-dd')  as dt
>   from 
>  test;
> 
>  when the LAST_UPDATE_TIME value is "Tue Jan 22 21:03:12 CST 2019”, the 
> unix_timestamp return a Timestamp with value 1548162182001.
>   but when from_unixtime is invoked, the timestamp with value 
> 1548190982001 is passed in, there are 8 hours shift between them.
>   May I know why there are 8 hours shift between them, and how can I get 
> the timestamp that are passed out originally from the first UDF without 
> changing the code?
>   
>   Thanks very much.
> 
> Best
> Henry



How to infer table schema from Avro file

2019-01-27 Thread Soheil Pourbafrani
Hi, I load an Avro file in a Flink Dataset:

AvroInputFormat test = new AvroInputFormat(
new Path("PathToAvroFile)
, GenericRecord.class);
DataSet DS = env.createInput(test);

usersDS.print();

and here are the results of printing DS:
{"N_NATIONKEY": 14, "N_NAME": "KENYA", "N_REGIONKEY": 0, "N_COMMENT": "
pending excuses haggle furiously deposits. pending, express pinto beans
wake fluffily past t"}
{"N_NATIONKEY": 15, "N_NAME": "MOROCCO", "N_REGIONKEY": 0, "N_COMMENT":
"rns. blithely bold courts among the closely regular packages use furiously
bold platelets?"}
{"N_NATIONKEY": 16, "N_NAME": "MOZAMBIQUE", "N_REGIONKEY": 0, "N_COMMENT":
"s. ironic, unusual asymptotes wake blithely r"}
{"N_NATIONKEY": 17, "N_NAME": "PERU", "N_REGIONKEY": 1, "N_COMMENT":
"platelets. blithely pending dependencies use fluffily across the even
pinto beans. carefully silent accoun"}
{"N_NATIONKEY": 18, "N_NAME": "CHINA", "N_REGIONKEY": 2, "N_COMMENT": "c
dependencies. furiously express notornis sleep slyly regular accounts.
ideas sleep. depos"}
{"N_NATIONKEY": 19, "N_NAME": "ROMANIA", "N_REGIONKEY": 3, "N_COMMENT":
"ular asymptotes are about the furious multipliers. express dependencies
nag above the ironically ironic account"}
{"N_NATIONKEY": 20, "N_NAME": "SAUDI ARABIA", "N_REGIONKEY": 4,
"N_COMMENT": "ts. silent requests haggle. closely express packages sleep
across the blithely"}

Now I want to create a table from DS Dataset with the exactly the same
schema of Avro file, I mean columns should be N_NATIONKEY, N_NAME,
N_REGIONKEY, and N_COMMENT.

I know using the line:

tableEnv.registerDataSet("tbTest", usersDS, "field1, field2, ...");

I can create a table and set the columns, but I want the columns to be
inferred automatically from data. Is it possible?
I tried

tableEnv.registerDataSet("tbTest", DS);

but it creates a table with the schema:
root
 |-- f0: GenericType


Re: How to load Avro file in a Dataset

2019-01-27 Thread Ken Krugler
Hi Soheil,

I’ve used Avro in the past, but I’m no expert - so I could be missing something 
obvious here…

But if you don’t know any of the fields in the schema, then what processing 
would you do with the data in your Flink workflow?

— Ken

> On Jan 27, 2019, at 5:50 AM, Soheil Pourbafrani  wrote:
> 
> According to the Flink document, it's possible to load Avro file like the 
> following:
> AvroInputFormat users = new AvroInputFormat(in, User.class);
> DataSet usersDS = env.createInput(users);
> It's a bit confusing for me. I guess the User is a predefined class. My 
> question is can Flink detect the Avro file schema automatically? How can I 
> load Avro file without any predefined class?

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Flink Yarn Cluster - Jobs Isolation

2019-01-27 Thread Eran Twili
Hi,

In my company we are interesting in running Flink jobs on a Yarn cluster (on 
AWS EMR).
We understand that there are 2 ways ('modes') to execute Flink jobs on a yarn 
cluster.
We must have the jobs run concurrently!
>From what we understand so far those are the options:

  1.  Start a long running yarn session, to which we'll send jobs.
  2.  Run each job as a 'single job'.
We searched the web to understand the difference and consequences of each 
option,
(We read threw 
flink-yarn-setup
 and 
FLIP6,
 along many other references),
but couldn't find clear comprehensive info.

In the 'session' mode:

  1.  Does running multiple jobs in single session means there's no job 
isolation?
  2.  All jobs will run on the same jvm?
  3.  Can we define different classpath for each job in this mode?
In the 'single job' mode:

  1.  Can we run multiple jobs concurrently?
  2.  Is there a complete job isolation by default or do we need to configure 
it (different jvm/classpath)?

Overall, what will be the different implications in aspects of resource 
management, security, and monitoring?
Another question: what is the difference between multiple sessions of a single 
job vs multiple 'single job' executions?

We'll be very thankful if someone could provide some answers or reference to a 
comprehensive documentation on those subjects.

Regards,
Eran



Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


How to load Avro file in a Dataset

2019-01-27 Thread Soheil Pourbafrani
According to the Flink document, it's possible to load Avro file like the
following:

AvroInputFormat users = new AvroInputFormat(in,
User.class);DataSet usersDS = env.createInput(users);

It's a bit confusing for me. I guess the User is a predefined class. My
question is can Flink detect the Avro file schema automatically? How can I
load Avro file without any predefined class?


Re: Query on retract stream

2019-01-27 Thread Gagan Agrawal
Thanks Hequn for sharing those details. Looking forward for Blink
integration.
I have one doubt around one of your earlier statements

*> Also, currently, the window doesn't have the ability to handle
retraction messages*

When we use multi window (as you suggested), it is able to handle updates.
So what does this statement really mean? Does that mean using multi window
is just a work around as with single window it's not able to handle
retraction messages?

Also wanted to confirm if tumbling window in Table/SQL api can handle late
data (i.e data arriving after window has closed), do we have something
similar to Datastream apj which has allowedLateness feature? You already
mentioned that for sliding window it can not handle late data. But does
that apply for Tumbling window as well?

One of the challenge in using unbounded aggregates in Table api is around
state retention. As I understand only way to clear old state is via query
config on idleTimeRetention. However it's a global parameter and not per
aggregate parameter. So in my flink job, if I want mix of minute, hourly
and daily aggregates, I will have to keep idleTimeRetention to minimum of
day which means all minute level aggregations will also exist for entire
day and hence would lead to increase in state size.

Gagan

Gagan



On Sun, Jan 27, 2019 at 9:42 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> Besides the eventime and proctime difference, there is another difference
> between the two ways. The window aggregate on bounded data, while unbounded
> aggregate on unbounded data, i.e., the new coming data can update a very
> old data.
>
> As for the performance, I think the two ways may have no big difference in
> current Flink version. Maybe you can run some tests between them on your
> own scenarios if both of them can solve your problem. FYI: There is a nice
> discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
> unbounded aggregate will be much faster than the window.
>
> Best,
> Hequn
>
> [1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32
>
>
> On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
> wrote:
>
>> Thanks Hequn for suggested solutions and I think this should really work
>> and will give it a try. As I understand First solution  of using multiple
>> windows will be good for those scenarios where I want output to be
>> generated post window is materialized (i.e. watermark reaches end of
>> window). And second will be good if I want it to be fired on per event
>> basis (i.e no watermarking). Apart from this, do you see any difference
>> from performance perspective in choosing between the two or both should be
>> equally performant?
>>
>> Gagan
>>
>> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng 
>> wrote:
>>
>>> Hi Gagan,
>>>
>>> Time attribute fields will be materialized by the unbounded groupby.
>>> Also, currently, the window doesn't have the ability to handle retraction
>>> messages. I see two ways to solve the problem.
>>>
>>> - Use multi-window.  The first window performs lastValue, the second
>>> performs count.
>>> - Use two non-window aggregates. In this case, you don't have to change
>>> anything for the first aggregate. For the second one, you can group by an
>>> hour field and perform count(). The code looks like:
>>>
>>> SELECT userId,
>>>  count(orderId)
>>> FROM
>>> (SELECT orderId,
>>>  getHour(orderTime) as myHour,
>>>  lastValue(userId) AS userId,
>>>  lastValue(status) AS status
>>> FROM orders
>>> GROUP BY  orderId, orderTime)
>>> WHERE status='PENDING'
>>> GROUP BY myHour, userId
>>>
>>> Best,
>>> Hequn
>>>
>>>
>>>
>>>
>>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>>> wrote:
>>>
 Based on the suggestions in this mail thread, I tried out few
 experiments on upsert stream with flink 1.7.1 and here is the issue I am
 facing with window stream.

 *1. Global Pending order count. *
 Following query works fine and it's able to handle updates as per
 original requirement.

 select userId, count(orderId) from
 (select orderId, lastValue(userId) as userId, lastValue(status) as
 status from orders group by orderId)
 where status='PENDING' group by userId

 *2. Last 1 Hour tumbling window count (Append stream)*
 Though following query doesn't handle upsert stream, I just tried to
 make sure time column is working fine. This is working, but as expected, it
 doesn't handle updates on orderId.

 select userId, count(orderId) from orders
 where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
 userId

 3. *Last 1 Hour tumbling window count (With upsert stream)*
 Now I tried combination of above two where input stream is converted to
 upsert stream (via lastValue aggregate function) and then Pending count
 needs to be calculated in last 1 hour window.

 select userId, count(orderId) from
 (select orderId, orderTime, 

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-27 Thread Erik van Oosten

Hi sohimankotia,

My advise from also having to sub-class BucketingSink:

* rebase your changes on the BucketingSink that comes with the Flink 
version you are using
* use the same super completely ugly hack I had to deploy as described 
here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-BucketingSink-when-cancelling-Flink-job-td15856.html#a16168

* consider using the successor of BucketingSink: StreamingFileSink

Good luck,
    Erik.


Op 27-01-19 om 10:13 schreef sohimankotia:

Hi Team,

Any help/update on this ?

This is still an issue where i am using bucketing sink in production.



Thanks
Sohi




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-27 Thread sohimankotia
Hi Team,

Any help/update on this ?

This is still an issue where i am using bucketing sink in production.



Thanks
Sohi




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/