Re: [DISCUSS] Convert main Table API classes into traits

2018-03-02 Thread Rong Rong
Hi Shuyi,

I am already assuming all package private and protected modifiers will be
cleaned up and  moved to internal implementation. Please correct me if I
were wrong, Timo.

Thanks,
Rong


On Fri, Mar 2, 2018, 5:02 PM Shuyi Chen  wrote:

> Hi Timo,
>
> I am throwing some second thoughts here, as I don't quite see what trait
> provides over abstract class here for TableEnvironment case. Trait in scala
> can also have implementation and you can have 'private[flink]' or
> 'protected'  type and method in trait as well.
>
> AFAIK, the differences between Scala trait and abstract class are:
> 1) you can have constructor for abstract class, but not in trait
> 2) Abstract classes are fully interoperable with Java. You can call them
> from Java code without any wrappers. Traits are fully interoperable only if
> they do not contain any implementation code for scala 2.11.
> 3) you can do multiple inheritance or mixin composition with trait.
>
> In the TableEnvironment case,
> 1) I don't see a need for mixin, and class hierarchy seems fit better here
> by design.
> 2) to better interoperate with Java from scala 2.11, it's better to use
> abstract class. (But AFAIK, scala 2.12 and java 8 would be compatible,
> though)
> 3) you might pay a bit performance overhead with trait (compiled to
> interface) compared to abstract class, but it's not a big deal here.
>
> But in other cases, trait might be a better one if it might be reused and
> mixined in multiple, unrelated classes.
>
> So another option would be to refactor TableEnvironment to clean up or move
> the 'private[flink]' or 'protected' stuff to the actual implementor
> (e.g. 'InternalTableEnvironment') as
> you would do for your trait approach for TableEnvironment. I think this
> option might help with backward compatibility as well. Thanks.
>
> Shuyi
>
> On Fri, Mar 2, 2018 at 10:25 AM, Rong Rong  wrote:
>
> > Hi Timo,
> >
> > Thanks for looking into this Timo. It's becoming increasingly messy for
> my
> > trying to locate the correct functions in IDE :-/
> >
> > This is probably due to the fact that Scala and Java access modifiers /
> > qualifiers are subtly and fundamentally different. Using Trait might be
> the
> > best solution here. Another way I can think of is to move the all
> > TableEnvironment classes to Java side, but that would probably introduce
> a
> > lot of issue we need to resolve on the Scala side though. "protected" is
> > less restrictive in Java but there's really no equivalent of package
> > private modifier on Java.
> >
> > I was wondering is there any better way to provide backward-compatible
> > support though. I played around with it and seems like every "protected"
> > field will create a private Java member and a public getter, should we
> add
> > them all and annotate with "@Deprecated" ?
> > --
> > Rong
> >
> > On Thu, Mar 1, 2018 at 10:58 AM, Timo Walther 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm currently thinking about how to implement FLINK-8606. The reason
> > > behind it is that Java users are able to see all variables and methods
> > that
> > > are declared 'private[flink]' or even 'protected' in Scala. Classes
> such
> > as
> > > TableEnvironment look very messy from the outside in Java. Since we
> > cannot
> > > change the visibility of Scala protected members, I was thinking about
> a
> > > bigger change to solve this issue once and for all. My idea is to
> convert
> > > all TableEnvironment classes and maybe the Table class into traits. The
> > > actual implementation would end up in some internal classes such as
> > > "InternalTableEnvironment" that implement the public traits. The goal
> > would
> > > be to stay source code compatible.
> > >
> > > What do you think?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> >
>
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>


Re: [DISCUSS] Convert main Table API classes into traits

2018-03-02 Thread Shuyi Chen
Hi Timo,

I am throwing some second thoughts here, as I don't quite see what trait
provides over abstract class here for TableEnvironment case. Trait in scala
can also have implementation and you can have 'private[flink]' or
'protected'  type and method in trait as well.

AFAIK, the differences between Scala trait and abstract class are:
1) you can have constructor for abstract class, but not in trait
2) Abstract classes are fully interoperable with Java. You can call them
from Java code without any wrappers. Traits are fully interoperable only if
they do not contain any implementation code for scala 2.11.
3) you can do multiple inheritance or mixin composition with trait.

In the TableEnvironment case,
1) I don't see a need for mixin, and class hierarchy seems fit better here
by design.
2) to better interoperate with Java from scala 2.11, it's better to use
abstract class. (But AFAIK, scala 2.12 and java 8 would be compatible,
though)
3) you might pay a bit performance overhead with trait (compiled to
interface) compared to abstract class, but it's not a big deal here.

But in other cases, trait might be a better one if it might be reused and
mixined in multiple, unrelated classes.

So another option would be to refactor TableEnvironment to clean up or move
the 'private[flink]' or 'protected' stuff to the actual implementor
(e.g. 'InternalTableEnvironment') as
you would do for your trait approach for TableEnvironment. I think this
option might help with backward compatibility as well. Thanks.

Shuyi

On Fri, Mar 2, 2018 at 10:25 AM, Rong Rong  wrote:

> Hi Timo,
>
> Thanks for looking into this Timo. It's becoming increasingly messy for my
> trying to locate the correct functions in IDE :-/
>
> This is probably due to the fact that Scala and Java access modifiers /
> qualifiers are subtly and fundamentally different. Using Trait might be the
> best solution here. Another way I can think of is to move the all
> TableEnvironment classes to Java side, but that would probably introduce a
> lot of issue we need to resolve on the Scala side though. "protected" is
> less restrictive in Java but there's really no equivalent of package
> private modifier on Java.
>
> I was wondering is there any better way to provide backward-compatible
> support though. I played around with it and seems like every "protected"
> field will create a private Java member and a public getter, should we add
> them all and annotate with "@Deprecated" ?
> --
> Rong
>
> On Thu, Mar 1, 2018 at 10:58 AM, Timo Walther  wrote:
>
> > Hi everyone,
> >
> > I'm currently thinking about how to implement FLINK-8606. The reason
> > behind it is that Java users are able to see all variables and methods
> that
> > are declared 'private[flink]' or even 'protected' in Scala. Classes such
> as
> > TableEnvironment look very messy from the outside in Java. Since we
> cannot
> > change the visibility of Scala protected members, I was thinking about a
> > bigger change to solve this issue once and for all. My idea is to convert
> > all TableEnvironment classes and maybe the Table class into traits. The
> > actual implementation would end up in some internal classes such as
> > "InternalTableEnvironment" that implement the public traits. The goal
> would
> > be to stay source code compatible.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
>



-- 
"So you have to trust that the dots will somehow connect in your future."


[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API

2018-03-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8844:
-

 Summary: Export job jar file name or job version property via REST 
API
 Key: FLINK-8844
 URL: https://issues.apache.org/jira/browse/FLINK-8844
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.3
Reporter: Elias Levy


To aid automated deployment of jobs, it would be useful if the REST API exposed 
either a running job's jar filename or a version property the job could set, 
similar to how it sets the job name.

As it is now there is no standard mechanism to determine what version of a job 
is running in a cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8843) Decouple bind REST address from advertised address

2018-03-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8843:


 Summary: Decouple bind REST address from advertised address
 Key: FLINK-8843
 URL: https://issues.apache.org/jira/browse/FLINK-8843
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann
 Fix For: 1.5.0, 1.6.0


The {{RestServerEndpoint}} is currently bound to the same address which is also 
advertised to the client, namely {{RestOptions#REST_ADDRESS}}. It would be 
better to start the {{RestServerEndpoint}} listening on all address by binding 
to {{0.0.0.0}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8842) Change default REST port to 8081

2018-03-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8842:


 Summary: Change default REST port to 8081
 Key: FLINK-8842
 URL: https://issues.apache.org/jira/browse/FLINK-8842
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0, 1.6.0


In order to avoid confusion, we should set the default REST port to 8081.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Convert main Table API classes into traits

2018-03-02 Thread Rong Rong
Hi Timo,

Thanks for looking into this Timo. It's becoming increasingly messy for my
trying to locate the correct functions in IDE :-/

This is probably due to the fact that Scala and Java access modifiers /
qualifiers are subtly and fundamentally different. Using Trait might be the
best solution here. Another way I can think of is to move the all
TableEnvironment classes to Java side, but that would probably introduce a
lot of issue we need to resolve on the Scala side though. "protected" is
less restrictive in Java but there's really no equivalent of package
private modifier on Java.

I was wondering is there any better way to provide backward-compatible
support though. I played around with it and seems like every "protected"
field will create a private Java member and a public getter, should we add
them all and annotate with "@Deprecated" ?
--
Rong

On Thu, Mar 1, 2018 at 10:58 AM, Timo Walther  wrote:

> Hi everyone,
>
> I'm currently thinking about how to implement FLINK-8606. The reason
> behind it is that Java users are able to see all variables and methods that
> are declared 'private[flink]' or even 'protected' in Scala. Classes such as
> TableEnvironment look very messy from the outside in Java. Since we cannot
> change the visibility of Scala protected members, I was thinking about a
> bigger change to solve this issue once and for all. My idea is to convert
> all TableEnvironment classes and maybe the Table class into traits. The
> actual implementation would end up in some internal classes such as
> "InternalTableEnvironment" that implement the public traits. The goal would
> be to stay source code compatible.
>
> What do you think?
>
> Regards,
> Timo
>
>


[jira] [Created] (FLINK-8841) Duplicate MapSerializer and HashMapSerializer.

2018-03-02 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8841:
-

 Summary: Duplicate MapSerializer and HashMapSerializer.
 Key: FLINK-8841
 URL: https://issues.apache.org/jira/browse/FLINK-8841
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Type Serialization System
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0


Currently there are two class MapSerializer and HashMapSerializer whose code is 
the same with the only difference being that one includes elements of type Map 
and the other HashMap. 

In addition, these two were merged on the same commit. 

I would like to remove the HashMapSerializer. I already created a branch 
without the HashMapSerialzer and nothing seems to be failing on Travis. The 
reasons why I hesitate to do it, is because I am not sure if this may create 
problems with Backwards Compatibility.

[~xiaogang.sxg] could you elaborate a bit on why they were both added and if 
there is any danger in removing the HashMapSerializer?

Also [~StephanEwen] and [~stefanrichte...@gmail.com] it is worth having a look 
and if you are ok, I can remove the redundant serializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Proposal - Change shard discovery in Flink Kinesis Connector to use ListShards

2018-03-02 Thread Thomas Weise
It will be good to be able to use the ListShards API. Are there any
concerns bumping up the AWS SDK dependency? I see it was last done in
https://issues.apache.org/jira/browse/FLINK-7422

Thanks

On Wed, Feb 28, 2018 at 10:38 PM, Kailash Dayanand 
wrote:

> Based on the discussion at here
> ,
> I want to propose using the latest ListShards API instead of the
> DescribeStreams on AWS to overcome the rate limits currently imposed on
> DescribeStream. The new List Shards have a much higher rate limits (a
> limit of 100 transactions per second per data stream link
> ).
> This was recently introduced in the aws-sdk-java release of 1.11.272
> . I propose
> bumping up the aws-sdk-java used in flink-kinesis connector and replace the
> DescribeStream calls with ListShards in the KinesisProxy class here
> 
>  allowing
> for faster shard discovery rate.
>
> Thanks
> Kailash
>


Re: Verifying watermarks in integration test

2018-03-02 Thread Thomas Weise
Hi,

I had sorted out how to run the topology in embedded mode. What wasn't
clear to me is how I can verify the watermark, but as per following thread
that can be done by inserting a process function:

https://lists.apache.org/thread.html/9a47cb2284032527c1b63d35beb9bd2d4bdc36197849b84f5f69b768@%3Cdev.flink.apache.org%3E

Thanks,
Thomas



On Wed, Feb 28, 2018 at 4:35 AM, Xingcan Cui  wrote:

> Hi Thomas,
>
> generally speaking, if you want to test a whole job, just run the pipeline
> in your test case with a collection-based source and a result collecting
> sink. If your single operator tests passes while the integration test
> fails, maybe you should first check the timestamp / watermark assigners or
> the partitioning mechanisms used.
>
> Best,
> Xingcan
>
> > On 28 Feb 2018, at 5:46 AM, Thomas Weise  wrote:
> >
> > Hi Xingcan,
> >
> > thanks, this is a good way of testing an individual operator. I had
> written
> > my own mock code to intercept source context and collect the results,
> this
> > is a much better approach for operator testing.
> >
> > I wonder how I can verify with an embedded Flink cluster though. Even
> > though my single operator test passes, the results are not emitted as
> > expected within a topology (not observed in the attached sink). What's
> the
> > test approach there?
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Feb 21, 2018 at 12:43 AM, Xingcan Cui 
> wrote:
> >
> >> Hi Thomas,
> >>
> >> some test cases in JoinHarnessTest  >> flink/blob/release-1.4/flink-libraries/flink-table/src/
> >> test/scala/org/apache/flink/table/runtime/harness/
> JoinHarnessTest.scala>
> >> show how to verify the emitted watermarks.
> >>
> >> Hope this helps.
> >>
> >> Best,
> >> Xingcan
> >>
> >>> On 21 Feb 2018, at 2:09 PM, Thomas Weise  wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have a streaming integration test with two operators. A source that
> >> emits
> >>> records and watermarks, and a sink that collects the records. The
> >> topology
> >>> runs in embedded mode and the results are collected in memory. Now, in
> >>> addition to the records, I also want to verify that watermarks have
> been
> >>> emitted. What's the recommended way of doing that?
> >>>
> >>> Thanks
> >>
> >>
>
>


[jira] [Created] (FLINK-8840) Pull out YarnClient initialization out of AbstractYarnClusterDescriptor

2018-03-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8840:


 Summary: Pull out YarnClient initialization out of 
AbstractYarnClusterDescriptor
 Key: FLINK-8840
 URL: https://issues.apache.org/jira/browse/FLINK-8840
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0, 1.6.0


In order to better test the {{AbstractYarnClusterDescriptor}}, I suggest to 
pull the {{YarnClient}} initialization and the {{YarnConfiguration}} creation 
out of the {{AbstractYarnClusterDescriptor}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8839) Table source factory discovery is broken in SQL Client

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8839:
---

 Summary: Table source factory discovery is broken in SQL Client
 Key: FLINK-8839
 URL: https://issues.apache.org/jira/browse/FLINK-8839
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Table source factories cannot not be discovered if they were added using a jar 
file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: StreamSQL queriable state

2018-03-02 Thread Stefano Bortoli
Hi Timo, Renjie,

What I was thinking did not include the QueryableStateTableSink, but rather tap 
in directly into the state of a streaming operator. Perhaps it is the same 
thing, but just it sounds not intuitive to consider it a sink.

So, we would need a way to configure the environment for the query to share the 
"state name" before the query is executed, and then use this to create the hook 
for the queriable state in the operator. Perhaps extend the current codegen and 
operator implementations to get as a parameter the StateDescriptor to be 
inquired. 

Looking forward for the design document, will be happy to give you feedback.

Best,
Stefano

-Original Message-
From: Renjie Liu [mailto:liurenjie2...@gmail.com] 
Sent: Friday, March 02, 2018 11:42 AM
To: dev@flink.apache.org
Subject: Re: StreamSQL queriable state

Great, thank you.
I'll start by writing a design doc.

On Fri, Mar 2, 2018 at 6:40 PM Timo Walther  wrote:

> I gave you contributor permissions in Jira. You should be able to 
> assign it to yourself now.
>
> Am 3/2/18 um 11:33 AM schrieb Renjie Liu:
> > Hi, Timo:
> > It seems that I can't assign it to myself. Could you please help to
> assign
> > that to me?
> > My jira username is liurenjie1024 and my email is
> liurenjie2...@gmail.com
> >
> > On Fri, Mar 2, 2018 at 6:24 PM Timo Walther  wrote:
> >
> >> Hi Renjie,
> >>
> >> that would be great. There is already a Jira issue for it:
> >> https://issues.apache.org/jira/browse/FLINK-6968
> >>
> >> Feel free to assign it to yourself. You can reuse parts of my code 
> >> if you want. But maybe it would make sense to have a little design 
> >> document first about what we want to support.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> Am 3/2/18 um 11:10 AM schrieb Renjie Liu:
> >>> Hi, Timo, I've been planning on the same thing and would like to
> >> contribute
> >>> that.
> >>>
> >>> On Fri, Mar 2, 2018 at 6:05 PM Timo Walther 
> wrote:
> >>>
>  Hi Stefano,
> 
>  yes there are plan in this direction. Actually, I already worked 
>  on
> such
>  a QueryableStateTableSink [1] in the past but never finished it
> because
>  of priority shifts. Would be great if somebody wants to 
>  contribute
> this
>  functionality :)
> 
>  Regards,
>  Timo
> 
>  [1] https://github.com/twalthr/flink/tree/QueryableTableSink
> 
>  Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:
> > Hi guys,
> >
> > I am checking out the queriable state API, and it seems that 
> > most of
> >> the
>  tooling is already available. However, the queriable state is
> available
>  just for the streaming API, not at the StreamSQL API level. In
> >> principle,
>  as the flink-table is aware of the query semantic and data output
> type,
> >> it
>  should be possible to configure the query compilation to nest
> queriable
>  state in the process/window functions. Is there any plan in this
> >> direction?
> > Best,
> > Stefano
> >
>  --
> >>> Liu, Renjie
> >>> Software Engineer, MVAD
> >>>
> >> --
> > Liu, Renjie
> > Software Engineer, MVAD
> >
>
> --
Liu, Renjie
Software Engineer, MVAD


[jira] [Created] (FLINK-8838) Add Support UNNEST a MultiSet type field

2018-03-02 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-8838:
--

 Summary: Add Support UNNEST a MultiSet type field
 Key: FLINK-8838
 URL: https://issues.apache.org/jira/browse/FLINK-8838
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: lincoln.lee
Assignee: lincoln.lee


{code}MultiSetTypeInfo\{code} was introduced by  FLINK-7491, and 
\{code}UNNEST\{code} support \{code}Array\{code} type only,  so it would be 
nice to support `UNNEST` a `MultiSet` type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8837:
---

 Summary: Move DataStreamUtils to package 'experimental'.
 Key: FLINK-8837
 URL: https://issues.apache.org/jira/browse/FLINK-8837
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Stephan Ewen
 Fix For: 1.5.0


The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
moved to the fully supported API packages. It should be in package 
'experimental' to properly communicate that it is not guaranteed to be API 
stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8836:
--

 Summary: Duplicating a KryoSerializer does not duplicate 
registered default serializers
 Key: FLINK-8836
 URL: https://issues.apache.org/jira/browse/FLINK-8836
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai


The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code}

public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

 

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
 }


{code}

Shortly put, when duplicating a `KryoSerializer`, the `defaultSerializers` 
serializer instances are directly provided to the new `KryoSerializer` instance.
This causes the fact that those default serializers are shared across two 
different `KryoSerializer` instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8835) Fix TaskManager config keys

2018-03-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8835:
---

 Summary: Fix TaskManager config keys
 Key: FLINK-8835
 URL: https://issues.apache.org/jira/browse/FLINK-8835
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Reporter: Stephan Ewen
 Fix For: 1.5.0


Many new config keys in the TaskManager don't follow the proper naming scheme. 
We need to clear those up before the release. I would also suggest to keep the 
key names short, because that makes it easier for users.

When doing this cleanup pass over the config keys, I would suggest to also make 
some of the existing keys more hierarchical harmonize them with the common 
scheme in Flink.

## New Keys

* {{taskmanager.network.credit-based-flow-control.enabled}} to 
{{taskmanager.network.credit-model}}.

* {{taskmanager.exactly-once.blocking.data.enabled}} to 
{{task.checkpoint.alignment.blocking}} (we already have 
{{task.checkpoint.alignment.max-size}})

## Existing Keys

* {{taskmanager.debug.memory.startLogThread}} => 
{{taskmanager.debug.memory.log}}

* {{taskmanager.debug.memory.logIntervalMs}} => 
{{taskmanager.debug.memory.log-interval}}

* {{taskmanager.initial-registration-pause}} => 
{{taskmanager.registration.initial-backoff}}

* {{taskmanager.max-registration-pause}} => 
{{taskmanager.registration.max-backoff}}

* {{taskmanager.refused-registration-pause}} 
{{taskmanager.registration.refused-backoff}}

* {{taskmanager.maxRegistrationDuration}} ==> * 
{{taskmanager.registration.timeout}}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: StreamSQL queriable state

2018-03-02 Thread Renjie Liu
Great, thank you.
I'll start by writing a design doc.

On Fri, Mar 2, 2018 at 6:40 PM Timo Walther  wrote:

> I gave you contributor permissions in Jira. You should be able to assign
> it to yourself now.
>
> Am 3/2/18 um 11:33 AM schrieb Renjie Liu:
> > Hi, Timo:
> > It seems that I can't assign it to myself. Could you please help to
> assign
> > that to me?
> > My jira username is liurenjie1024 and my email is
> liurenjie2...@gmail.com
> >
> > On Fri, Mar 2, 2018 at 6:24 PM Timo Walther  wrote:
> >
> >> Hi Renjie,
> >>
> >> that would be great. There is already a Jira issue for it:
> >> https://issues.apache.org/jira/browse/FLINK-6968
> >>
> >> Feel free to assign it to yourself. You can reuse parts of my code if
> >> you want. But maybe it would make sense to have a little design document
> >> first about what we want to support.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> Am 3/2/18 um 11:10 AM schrieb Renjie Liu:
> >>> Hi, Timo, I've been planning on the same thing and would like to
> >> contribute
> >>> that.
> >>>
> >>> On Fri, Mar 2, 2018 at 6:05 PM Timo Walther 
> wrote:
> >>>
>  Hi Stefano,
> 
>  yes there are plan in this direction. Actually, I already worked on
> such
>  a QueryableStateTableSink [1] in the past but never finished it
> because
>  of priority shifts. Would be great if somebody wants to contribute
> this
>  functionality :)
> 
>  Regards,
>  Timo
> 
>  [1] https://github.com/twalthr/flink/tree/QueryableTableSink
> 
>  Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:
> > Hi guys,
> >
> > I am checking out the queriable state API, and it seems that most of
> >> the
>  tooling is already available. However, the queriable state is
> available
>  just for the streaming API, not at the StreamSQL API level. In
> >> principle,
>  as the flink-table is aware of the query semantic and data output
> type,
> >> it
>  should be possible to configure the query compilation to nest
> queriable
>  state in the process/window functions. Is there any plan in this
> >> direction?
> > Best,
> > Stefano
> >
>  --
> >>> Liu, Renjie
> >>> Software Engineer, MVAD
> >>>
> >> --
> > Liu, Renjie
> > Software Engineer, MVAD
> >
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: StreamSQL queriable state

2018-03-02 Thread Timo Walther
I gave you contributor permissions in Jira. You should be able to assign 
it to yourself now.


Am 3/2/18 um 11:33 AM schrieb Renjie Liu:

Hi, Timo:
It seems that I can't assign it to myself. Could you please help to assign
that to me?
My jira username is liurenjie1024 and my email is liurenjie2...@gmail.com

On Fri, Mar 2, 2018 at 6:24 PM Timo Walther  wrote:


Hi Renjie,

that would be great. There is already a Jira issue for it:
https://issues.apache.org/jira/browse/FLINK-6968

Feel free to assign it to yourself. You can reuse parts of my code if
you want. But maybe it would make sense to have a little design document
first about what we want to support.

Regards,
Timo


Am 3/2/18 um 11:10 AM schrieb Renjie Liu:

Hi, Timo, I've been planning on the same thing and would like to

contribute

that.

On Fri, Mar 2, 2018 at 6:05 PM Timo Walther  wrote:


Hi Stefano,

yes there are plan in this direction. Actually, I already worked on such
a QueryableStateTableSink [1] in the past but never finished it because
of priority shifts. Would be great if somebody wants to contribute this
functionality :)

Regards,
Timo

[1] https://github.com/twalthr/flink/tree/QueryableTableSink

Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:

Hi guys,

I am checking out the queriable state API, and it seems that most of

the

tooling is already available. However, the queriable state is available
just for the streaming API, not at the StreamSQL API level. In

principle,

as the flink-table is aware of the query semantic and data output type,

it

should be possible to configure the query compilation to nest queriable
state in the process/window functions. Is there any plan in this

direction?

Best,
Stefano


--

Liu, Renjie
Software Engineer, MVAD


--

Liu, Renjie
Software Engineer, MVAD





Re: StreamSQL queriable state

2018-03-02 Thread Renjie Liu
Hi, Timo:
It seems that I can't assign it to myself. Could you please help to assign
that to me?
My jira username is liurenjie1024 and my email is liurenjie2...@gmail.com

On Fri, Mar 2, 2018 at 6:24 PM Timo Walther  wrote:

> Hi Renjie,
>
> that would be great. There is already a Jira issue for it:
> https://issues.apache.org/jira/browse/FLINK-6968
>
> Feel free to assign it to yourself. You can reuse parts of my code if
> you want. But maybe it would make sense to have a little design document
> first about what we want to support.
>
> Regards,
> Timo
>
>
> Am 3/2/18 um 11:10 AM schrieb Renjie Liu:
> > Hi, Timo, I've been planning on the same thing and would like to
> contribute
> > that.
> >
> > On Fri, Mar 2, 2018 at 6:05 PM Timo Walther  wrote:
> >
> >> Hi Stefano,
> >>
> >> yes there are plan in this direction. Actually, I already worked on such
> >> a QueryableStateTableSink [1] in the past but never finished it because
> >> of priority shifts. Would be great if somebody wants to contribute this
> >> functionality :)
> >>
> >> Regards,
> >> Timo
> >>
> >> [1] https://github.com/twalthr/flink/tree/QueryableTableSink
> >>
> >> Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:
> >>> Hi guys,
> >>>
> >>> I am checking out the queriable state API, and it seems that most of
> the
> >> tooling is already available. However, the queriable state is available
> >> just for the streaming API, not at the StreamSQL API level. In
> principle,
> >> as the flink-table is aware of the query semantic and data output type,
> it
> >> should be possible to configure the query compilation to nest queriable
> >> state in the process/window functions. Is there any plan in this
> direction?
> >>> Best,
> >>> Stefano
> >>>
> >> --
> > Liu, Renjie
> > Software Engineer, MVAD
> >
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: StreamSQL queriable state

2018-03-02 Thread Timo Walther

Hi Renjie,

that would be great. There is already a Jira issue for it: 
https://issues.apache.org/jira/browse/FLINK-6968


Feel free to assign it to yourself. You can reuse parts of my code if 
you want. But maybe it would make sense to have a little design document 
first about what we want to support.


Regards,
Timo


Am 3/2/18 um 11:10 AM schrieb Renjie Liu:

Hi, Timo, I've been planning on the same thing and would like to contribute
that.

On Fri, Mar 2, 2018 at 6:05 PM Timo Walther  wrote:


Hi Stefano,

yes there are plan in this direction. Actually, I already worked on such
a QueryableStateTableSink [1] in the past but never finished it because
of priority shifts. Would be great if somebody wants to contribute this
functionality :)

Regards,
Timo

[1] https://github.com/twalthr/flink/tree/QueryableTableSink

Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:

Hi guys,

I am checking out the queriable state API, and it seems that most of the

tooling is already available. However, the queriable state is available
just for the streaming API, not at the StreamSQL API level. In principle,
as the flink-table is aware of the query semantic and data output type, it
should be possible to configure the query compilation to nest queriable
state in the process/window functions. Is there any plan in this direction?

Best,
Stefano


--

Liu, Renjie
Software Engineer, MVAD





Re: StreamSQL queriable state

2018-03-02 Thread Renjie Liu
Hi, Timo, I've been planning on the same thing and would like to contribute
that.

On Fri, Mar 2, 2018 at 6:05 PM Timo Walther  wrote:

> Hi Stefano,
>
> yes there are plan in this direction. Actually, I already worked on such
> a QueryableStateTableSink [1] in the past but never finished it because
> of priority shifts. Would be great if somebody wants to contribute this
> functionality :)
>
> Regards,
> Timo
>
> [1] https://github.com/twalthr/flink/tree/QueryableTableSink
>
> Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:
> > Hi guys,
> >
> > I am checking out the queriable state API, and it seems that most of the
> tooling is already available. However, the queriable state is available
> just for the streaming API, not at the StreamSQL API level. In principle,
> as the flink-table is aware of the query semantic and data output type, it
> should be possible to configure the query compilation to nest queriable
> state in the process/window functions. Is there any plan in this direction?
> >
> > Best,
> > Stefano
> >
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: StreamSQL queriable state

2018-03-02 Thread Timo Walther

Hi Stefano,

yes there are plan in this direction. Actually, I already worked on such 
a QueryableStateTableSink [1] in the past but never finished it because 
of priority shifts. Would be great if somebody wants to contribute this 
functionality :)


Regards,
Timo

[1] https://github.com/twalthr/flink/tree/QueryableTableSink

Am 3/2/18 um 10:58 AM schrieb Stefano Bortoli:

Hi guys,

I am checking out the queriable state API, and it seems that most of the 
tooling is already available. However, the queriable state is available just 
for the streaming API, not at the StreamSQL API level. In principle, as the 
flink-table is aware of the query semantic and data output type, it should be 
possible to configure the query compilation to nest queriable state in the 
process/window functions. Is there any plan in this direction?

Best,
Stefano





StreamSQL queriable state

2018-03-02 Thread Stefano Bortoli
Hi guys,

I am checking out the queriable state API, and it seems that most of the 
tooling is already available. However, the queriable state is available just 
for the streaming API, not at the StreamSQL API level. In principle, as the 
flink-table is aware of the query semantic and data output type, it should be 
possible to configure the query compilation to nest queriable state in the 
process/window functions. Is there any plan in this direction?

Best,
Stefano


[jira] [Created] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)
Daniel Harper created FLINK-8834:


 Summary: Job fails to restart due to some tasks stuck in 
cancelling state
 Key: FLINK-8834
 URL: https://issues.apache.org/jira/browse/FLINK-8834
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
 Environment: EMR 5.12

Flink 1.4.0

Beam 2.3.0
Reporter: Daniel Harper


Our job threw an exception overnight, causing the job to commence attempting a 
restart.

However it never managed to restart because 2 tasks are stuck in "Cancelling" 
state, with the following exception
{code:java}
2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task 
    - Task 'PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
 -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
cancelling signal, but is stuck in method:
 java.lang.Thread.blockedOn(Thread.java:239)
java.lang.System$2.blockedOn(System.java:1252)
java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
java.io.Writer.write(Writer.java:157)
org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87)
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1040)
org.apache.beam.runners.core.ReduceFnRunner$$Lambda$163/1408647946.output(Unknown
 Source)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:433)

[jira] [Created] (FLINK-8833) Create SQL Client JSON format fat-jar

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8833:
---

 Summary: Create SQL Client JSON format fat-jar
 Key: FLINK-8833
 URL: https://issues.apache.org/jira/browse/FLINK-8833
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Create a fat-jar for flink-json.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8831) Create SQL Client dependencies

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8831:
---

 Summary: Create SQL Client dependencies
 Key: FLINK-8831
 URL: https://issues.apache.org/jira/browse/FLINK-8831
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


A first minimum version of FLIP-24 for the upcoming 
Flink SQL Client has been merged to the master. We also merged 
possibilities to discover and configure table sources without a single 
line of code using string-based properties and Java service provider 
discovery.

We are now facing the issue of how to manage dependencies in this new 
environment. It is different from how regular Flink projects are created 
(by setting up a a new Maven project and build a jar or fat jar). 
Ideally, a user should be able to select from a set of prepared 
connectors, catalogs, and formats. E.g., if a Kafka connector and Avro 
format is needed, all that should be required is to move a 
"flink-kafka.jar" and "flink-avro.jar" into the "sql_lib" directory that 
is shipped to a Flink cluster together with the SQL query.

[As discussed on 
ML|http://mail-archives.apache.org/mod_mbox/flink-dev/201802.mbox/%3C9c73518b-ec8e-3b01-f200-dea816c75efc%40apache.org%3E],
 we will build fat jars for these modules with every Flink release that can be 
hostet somewhere (e.g. Apache infrastructure, but not Maven central). This 
would make it very easy to add a dependency by downloading the prepared JAR 
files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUS] Flink SQL Client dependency management

2018-03-02 Thread Timo Walther

Hi everyone,

thanks for your opinions. So the majority voted for option (2) fat jars 
that are ready to be used. I will create an Jira issue and prepare the 
infrastructure for the first connector and first format.


Regards,
Timo

Am 3/1/18 um 11:38 AM schrieb Fabian Hueske:

I agree, option (2) would be the easiest approach for the users.


2018-03-01 0:00 GMT+01:00 Rong Rong :


Hi Timo,

Thanks for the initiating the SQL client effort. I agree with Xingcan's
points, adding to it (1) most of the user for SQL client would very likely
to have little Maven / build tool knowledge and (2) most likely the build
script would grow much complex in the future that makes it exponentially
hard for user to modify themselves.

On (3) the single "fat" jar idea, adding on to the dependency conflict
issue, another very common way I see is that users often want to maintain a
list of individual jars, such as a list of relatively-constant, handy UDFs
every time using the SQL client. They will probably need to package and
ship separately anyway. I was wondering if "download-and-drop-in" might be
a more straight forward approach?

Best,
Rong

On Tue, Feb 27, 2018 at 8:23 AM, Stephan Ewen  wrote:


I think one problem with the "one fat jar for all" is that some
dependencies clash in the classnames across versions:
   - Kafka 0.9, 0.10, 0.11, 1.0
   - Elasticsearch 2, 4, and 5

There are probably others as well...

On Tue, Feb 27, 2018 at 2:57 PM, Timo Walther 

wrote:

Hi Xingcan,

thank you for your feedback. Regarding (3) we also thought about that

but

this approach would not scale very well. Given that we might have fat

jars

for multiple versions (Kafka 0.8, Kafka 0.6 etc.) such an all-in-one
solution JAR file might easily go beyond 1 or 2 GB. I don't know if

users

want to download that just for a combination of connector and format.

Timo


Am 2/27/18 um 2:16 PM schrieb Xingcan Cui:

Hi Timo,

thanks for your efforts. Personally, I think the second option would

be

better and here are my feelings.

(1) The SQL client is designed to offer a convenient way for users to
manipulate data with Flink. Obviously, the second option would be more
easy-to-use.

(2) The script will help to manage the dependencies automatically, but
with less flexibility. Once the script cannot meet the need, users

have

to

modify it themselves.

(3) I wonder whether we could package all these built-in connectors

and

formats into a single JAR. With this all-in-one solution, users don’t

need

to consider much about the dependencies.

Best,
Xingcan

On 27 Feb 2018, at 6:38 PM, Stephan Ewen  wrote:

My first intuition would be to go for approach #2 for the following
reasons

- I expect that in the long run, the scripts will not be that simple

to

maintain. We saw that with all shell scripts thus far: they start

simple,

and then grow with many special cases for this and that setup.

- Not all users have Maven, automatically downloading and configuring
Maven could be an option, but that makes the scripts yet more tricky.

- Download-and-drop-in is probably still easier to understand for

users

than the syntax of a script with its parameters

- I think it may actually be even simpler to maintain for us, because

all

it does is add a profile or build target to each connector to also

create

the fat jar.

- Storage space is no longer really a problem. Worst case we host the

fat

jars in an S3 bucket.


On Mon, Feb 26, 2018 at 7:33 PM, Timo Walther 
wrote:

Hi everyone,

as you may know a first minimum version of FLIP-24 [1] for the

upcoming

Flink SQL Client has been merged to the master. We also merged
possibilities to discover and configure table sources without a

single

line
of code using string-based properties [2] and Java service provider
discovery.

We are now facing the issue of how to manage dependencies in this

new

environment. It is different from how regular Flink projects are

created

(by setting up a a new Maven project and build a jar or fat jar).
Ideally,
a user should be able to select from a set of prepared connectors,
catalogs, and formats. E.g., if a Kafka connector and Avro format is
needed, all that should be required is to move a "flink-kafka.jar"

and

"flink-avro.jar" into the "sql_lib" directory that is shipped to a

Flink

cluster together with the SQL query.

The question is how do we want to offer those JAR files in the

future?

We
see two options:

1) We prepare Maven build profiles for all offered modules and

provide a

shell script for building fat jars. A script call could look like
"./sql-client-dependency.sh kafka 0.10". It would automatically

download

what is needed and place the JAR file in the library folder. This
approach
would keep our development effort low but would require Maven to be
present
and builds to pass on different environments (e.g. Windows).

2) We build fat jars for these 

[jira] [Created] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-02 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8830:
-

 Summary: YarnResourceManager throws NullPointerException
 Key: FLINK-8830
 URL: https://issues.apache.org/jira/browse/FLINK-8830
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Piotr Nowojski


 
{code:java}
java.lang.NullPointerException
at java.io.File.(File.java:277)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
at 
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
at 
org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
at 
org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
at 
org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
{code}
 

This exception is being thrown in 
`org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
apparently is being ignored, since the test completes. It seems like this line:
{code:java}
String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
Is not guarded against returned null value. I don't know if that's a test or 
production code issue.

CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Checkpointing Event Time Watermarks

2018-03-02 Thread vijay kansal
Hi Xingcan

We are receiving events from a no. of independent data sources and hence,
data arriving into our Flink topology (via Kafka) would be out of order.

We are creating 1-min event time windows in our Flink topology and
generating event time watermarks as (current event time - some threshold
(30 seconds)) at the source operator.

In case a few events arrive after the set threshold, those events are
simply ignored (which is ok in our case, because most of the events
belonging to that minute would have already arrived and got processed in
the corresponding window).

Now, the problem is that in case the program crashes (for whatever reason)
and is then resumed again from the last successful checkpoint, out of order
arriving events would trigger execution of past (already processed) windows
(with only a minuscule of events in that window) overriding results of
prev. computation of that window.

In case Flink had checkpointed event time watermarks, this problem would
not have occurred.

So, I am wondering if there is a way to enforce event time watermarks'
checkpointing in Flink...











Vijay Kansal
Software Development Engineer
LimeRoad

On Tue, Feb 27, 2018 at 6:54 PM, Xingcan Cui  wrote:

> Hi Vijay,
>
> normally, maybe there’s no need to checkpoint the event times / watermarks
> since they are automatically generated based on the records. What’s your
> intention?
>
> Best,
> Xingcan
>
> > On 27 Feb 2018, at 8:50 PM, vijay kansal  wrote:
> >
> > Hi All
> >
> > Is there a way to checkpoint event time watermarks in Flink ?
> >
> > I tries searching for this, but could not figure out...
> >
> >
> > Vijay Kansal
> > Software Development Engineer
> > LimeRoad
>
>