Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-11-14 Thread Zhang, Xuefu
Thanks, Bowen, for catching the error. I have granted comment permission with 
the link.

I also updated the doc with the latest class definitions. Everyone is 
encouraged to review and comment.

Thanks,
Xuefu


--
Sender:Bowen Li 
Sent at:2018 Nov 14 (Wed) 06:44
Recipient:Xuefu 
Cc:piotr ; dev ; Shuyi Chen 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu,

Currently the new design doc is on “view only" mode, and people cannot leave 
comments. Can you please change it to "can comment" or "can edit" mode?

Thanks, Bowen


On Mon, Nov 12, 2018 at 9:51 PM Zhang, Xuefu  wrote:

Hi Piotr

I have extracted the API portion of  the design and the google doc is here. 
Please review and provide your feedback.

Thanks,
Xuefu

--
Sender:Xuefu 
Sent at:2018 Nov 12 (Mon) 12:43
Recipient:Piotr Nowojski ; dev 
Cc:Bowen Li ; Shuyi Chen 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Piotr,

That sounds good to me. Let's close all the open questions ((there are a couple 
of them)) in the Google doc and I should be able to quickly split it into the 
three proposals as you suggested.

Thanks,
Xuefu

--
Sender:Piotr Nowojski 
Sent at:2018 Nov 9 (Fri) 22:46
Recipient:dev ; Xuefu 
Cc:Bowen Li ; Shuyi Chen 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi,

Yes, it seems like the best solution. Maybe someone else can also suggests if 
we can split it further? Maybe changes in the interface in one doc, reading 
from hive meta store another and final storing our meta informations in hive 
meta store?

Piotrek

> On 9 Nov 2018, at 01:44, Zhang, Xuefu  wrote:
> 
> Hi Piotr,
> 
> That seems to be good idea!
> 
> Since the google doc for the design is currently under extensive review, I 
> will leave it as it is for now. However, I'll convert it to two different 
> FLIPs when the time comes.
> 
> How does it sound to you?
> 
> Thanks,
> Xuefu
> 
> 
> --
> Sender:Piotr Nowojski 
> Sent at:2018 Nov 9 (Fri) 02:31
> Recipient:dev 
> Cc:Bowen Li ; Xuefu ; Shuyi 
> Chen 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Hi,
> 
> Maybe we should split this topic (and the design doc) into couple of smaller 
> ones, hopefully independent. The questions that you have asked Fabian have 
> for example very little to do with reading metadata from Hive Meta Store?
> 
> Piotrek 
> 
>> On 7 Nov 2018, at 14:27, Fabian Hueske  wrote:
>> 
>> Hi Xuefu and all,
>> 
>> Thanks for sharing this design document!
>> I'm very much in favor of restructuring / reworking the catalog handling in
>> Flink SQL as outlined in the document.
>> Most changes described in the design document seem to be rather general and
>> not specifically related to the Hive integration.
>> 
>> IMO, there are some aspects, especially those at the boundary of Hive and
>> Flink, that need a bit more discussion. For example
>> 
>> * What does it take to make Flink schema compatible with Hive schema?
>> * How will Flink tables (descriptors) be stored in HMS?
>> * How do both Hive catalogs differ? Could they be integrated into to a
>> single one? When to use which one?
>> * What meta information is provided by HMS? What of this can be leveraged
>> by Flink?
>> 
>> Thank you,
>> Fabian
>> 
>> Am Fr., 2. Nov. 2018 um 00:31 Uhr schrieb Bowen Li :
>> 
>>> After taking a look at how other discussion threads work, I think it's
>>> actually fine just keep our discussion here. It's up to you, Xuefu.
>>> 
>>> The google doc LGTM. I left some minor comments.
>>> 
>>> On Thu, Nov 1, 2018 at 10:17 AM Bowen Li  wrote:
>>> 
 Hi all,
 
 As Xuefu has published the design doc on google, I agree with Shuyi's
 suggestion that we probably should start a new email thread like "[DISCUSS]
 ... Hive integration design ..." on only dev mailing list for community
 devs to review. The current thread sends to both dev and user list.
 
 This email thread is more like validating the general idea and direction
 with the community, and it's been pretty long and crowded so far. Since
 everyone is pro for the idea, we can move forward with another thread to
 discuss and finalize the design.
 
 Thanks,
 Bowen
 
 On Wed, Oct 31, 2018 at 12:16 PM Zhang, Xuefu 
 wrote:
 
> Hi Shuiyi,
> 
> Good idea. Actually the PDF was converted from a google doc. Here is its
> link:
> 
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> Once we reach an agreement, I can convert it to a FLIP.
> 
> Thanks,
> Xuefu
> 
> 
> 
> --
> Sender:Shuyi Chen 
> 

[jira] [Created] (FLINK-10891) Upgrade Kafka client version to 2.0.1

2018-11-14 Thread vinoyang (JIRA)
vinoyang created FLINK-10891:


 Summary: Upgrade Kafka client version to 2.0.1
 Key: FLINK-10891
 URL: https://issues.apache.org/jira/browse/FLINK-10891
 Project: Flink
  Issue Type: Sub-task
  Components: Kafka Connector
Reporter: vinoyang
Assignee: vinoyang


Since the modern kafka connector only keeps track of the latest version of the 
kafka client. With the release of Kafka 2.0.1, we should upgrade the version of 
the kafka client maven dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10890) CLONE - Add DataStream HBase Sink

2018-11-14 Thread jocean....@gamil.com (JIRA)
jocean@gamil.com created FLINK-10890:


 Summary: CLONE - Add DataStream HBase Sink
 Key: FLINK-10890
 URL: https://issues.apache.org/jira/browse/FLINK-10890
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: jocean@gamil.com
Assignee: Shimin Yang


Design documentation: 
[https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-14 Thread Becket Qin
Hi Piotrek,

Thanks a lot for the detailed reply. All makes sense to me.

WRT the confusion between advance() / getCurrent(), do you think it would
help if we combine them and have something like:

CompletableFuture getNext();
long getWatermark();
long getCurrentTimestamp();

Cheers,

Jiangjie (Becket) Qin

On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski 
wrote:

> Hi,
>
> Thanks again for the detailed answer :) Sorry for responding with a delay.
>
> > Completely agree that in pattern 2, having a callback is necessary for
> that
> > single thread outside of the connectors. And the connectors MUST have
> > internal threads.
>
> Yes, this thread will have to exists somewhere. In pattern 2 it exists in
> the connector (at least from the perspective of the Flink execution
> engine). In pattern 1 it exists inside the Flink execution engine. With
> completely blocking connectors, like simple reading from files, both of
> those approaches are basically the same. The difference is when user
> implementing Flink source is already working with a non blocking code with
> some internal threads. In this case, pattern 1 would result in "double
> thread wrapping”, while pattern 2 would allow to skip one layer of
> indirection.
>
> > If we go that way, we should have something like "void
> > poll(Callback) / void advance(callback)". I am curious how would
> > CompletableFuture work here, though. If 10 readers returns 10 completable
> > futures, will there be 10 additional threads (so 20 threads in total)
> > blocking waiting on them? Or will there be a single thread busy loop
> > checking around?
>
> To be honest, I haven’t thought this completely through and I haven’t
> tested/POC’ed it. Having said that, I can think of at least couple of
> solutions. First is something like this:
>
>
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> <
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> >
>
> Line:
>
> `blocked = split.process();`
>
> Is where the execution goes into to the task/sources. This is where the
> returned future is handled:
>
> blocked.addListener(() -> {
> blockedSplits.remove(split);
> // reset the level priority to prevent
> previously-blocked splits from starving existing splits
> split.resetLevelPriority();
> waitingSplits.offer(split);
> }, executor);
>
> Fundamentally callbacks and Futures are more or less interchangeable You
> can always wrap one into another (creating a callback that completes a
> future and attach a callback once future completes). In this case the
> difference for me is mostly:
> - api with passing callback allows the callback to be fired multiple times
> and to fire it even if the connector is not blocked. This is what I meant
> by saying that api `CompletableFuture isBlocked()` is a bit simpler.
> Connector can only return either “I’m not blocked” or “I’m blocked and I
> will tell you only once when I’m not blocked anymore”.
>
> But this is not the most important thing for me here. For me important
> thing is to try our best to make Flink task’s control and execution single
> threaded. For that both callback and future APIs should work the same.
>
> > WRT pattern 1, a single blocking take() API should just work. The good
> > thing is that a blocking read API is usually simpler to implement.
>
> Yes, they are easier to implement (especially if you are not the one that
> have to deal with the additional threading required around them ;) ). But
> to answer this issue, if we choose pattern 2, we can always provide a
> proxy/wrapper that would using the internal thread implement the
> non-blocking API while exposing blocking API to the user. It would
> implement pattern 2 for the user exposing to him pattern 1. In other words
> implementing pattern 1 in pattern 2 paradigm, while making it possible to
> implement pure pattern 2 connectors.
>
> > BTW, one thing I am also trying to avoid is pushing users to perform IO
> in
> > a method like "isBlocked()". If the method is expected to fetch records
> > (even if not returning them), naming it something more explicit would
> help
> > avoid confusion.
>
> If we choose so, we could rework it into something like:
>
> CompletableFuture advance()
> T getCurrent();
> Watermark getCurrentWatermark()
>
> But as I wrote before, this is more confusing to me for the exact reasons
> you mentioned :) I would be confused what should be done in `adanvce()` and
> what in `getCurrent()`. However, again this naming issue is not that
> important to me and probably is matter of taste/personal preferences.
>
> Piotrek
>
> > On 9 Nov 2018, at 

Need help with Kinesis related PRs

2018-11-14 Thread Thomas Weise
Hi,

The following two PRs need review from committers:

https://github.com/apache/flink/pull/6980
https://github.com/apache/flink/pull/6968

Any help greatly appreciated!

Thanks,
Thomas


[jira] [Created] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print

2018-11-14 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-10889:
--

 Summary: Semantic inconsistency between DataSet#print and 
DataStream#print
 Key: FLINK-10889
 URL: https://issues.apache.org/jira/browse/FLINK-10889
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API, DataStream API
Reporter: Jeff Zhang


DataSet#print will print the result on client side, while DataStream#print will 
print the result on TM. This inconsistency will confuse users. IMHO, we should 
make the behavior consistency between DataSet and DataStream, I prefer to print 
the result on client side.  Regarding DataStream#print, we can use 
DataStreamUtils#collect to print it on client side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Sharing state between subtasks

2018-11-14 Thread Jamie Grier
Hey all,

I think all we need for this on the state sharing side is pretty simple.  I
opened a JIRA to track this work and submitted a PR for the state sharing
bit.

https://issues.apache.org/jira/browse/FLINK-10886
https://github.com/apache/flink/pull/7099

Please provide feedback :)

-Jamie


On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann  wrote:

> Hi Thomas,
>
> using Akka directly would further manifest our dependency on Scala in
> flink-runtime. This is something we are currently trying to get rid of. For
> that purpose we have added the RpcService abstraction which encapsulates
> all Akka specific logic. We hope that we can soon get rid of the Scala
> dependency in flink-runtime by using a special class loader only for
> loading the AkkaRpcService implementation.
>
> I think the easiest way to sync the task information is actually going
> through the JobMaster because the subtasks don't know on which other TMs
> the other subtasks run. Otherwise, we would need to have some TM detection
> mechanism between TMs. If you choose this way, then you should be able to
> use the RpcService by extending the JobMasterGateway by additional RPCs.
>
> Cheers,
> Till
>
> On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise  wrote:
>
> > Hi,
> >
> > We are planning to work on the Kinesis consumer in the following order:
> >
> > 1. Add per shard watermarking:
> > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> > already use internally; I will open a PR to add it to the Flink Kinesis
> > consumer
> > 2. Exchange of per subtask watermarks between all subtasks of one or
> > multiple sources
> > 3. Implement queue approach described in Jamie's document in to utilize
> 1.)
> > and 2.) to align the shard consumers WRT event time
> >
> > There was some discussion regarding the mechanism to share the watermarks
> > between subtasks. If there is something that can be re-used it would be
> > great. Otherwise I'm going to further investigate the Akka or JGroups
> > route. Regarding Akka, since it is used within Flink already, is there an
> > abstraction that you would recommend to consider to avoid direct
> > dependency?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> >  wrote:
> >
> > > Not yet. We only have some initial thoughts and have not worked on it
> > yet.
> > > We will update the progress in this discussion if have.
> > >
> > > Best,
> > > Zhijiang
> > > --
> > > 发件人:Aljoscha Krettek 
> > > 发送时间:2018年10月18日(星期四) 17:53
> > > 收件人:dev ; Zhijiang(wangzhijiang999) <
> > > wangzhijiang...@aliyun.com>
> > > 抄 送:Till Rohrmann 
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Hi Zhijiang,
> > >
> > > do you already have working code or a design doc for the second
> approach?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > wangzhijiang...@aliyun.com.INVALID> wrote:
> > > >
> > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > update and I want to share some thoughts from our experiences.
> > > >
> > > > We also encountered the source consuption skew issue before, and we
> are
> > > focused on improving this by two possible ways.
> > > >
> > > > 1. Control the read strategy by the downstream side. In detail, every
> > > input channel in downstream task corresponds to the consumption of one
> > > upstream source task, and we will tag each input channel with watermark
> > to
> > > find the lowest channel to read in high priority. In essence, we
> actually
> > > rely on the mechanism of backpressure. If the channel with highest
> > > timestamp is not read by downstream task for a while, it will block the
> > > corresponding source task to read when the buffers are exhausted. It is
> > no
> > > need to change the source interface in this way, but there are two
> major
> > > concerns: first it will affect the barier alignment resulting in
> > checkpoint
> > > delayed or expired. Second it can not confirm source consumption
> > alignment
> > > very precisely, and it is just a best effort way. So we gave up this
> way
> > > finally.
> > > >
> > > > 2. Add the new component of SourceCoordinator to coordinate the
> source
> > > consumption distributedly. For example we can start this componnet in
> the
> > > JobManager like the current role of CheckpointCoordinator. Then every
> > > source task would commnicate with JobManager via current RPC mechanism,
> > > maybe we can rely on the heartbeat message to attach the consumption
> > > progress as the payloads. The JobManagerwill accumulator or state all
> the
> > > reported progress and then give responses for different source tasks.
> We
> > > can define a protocol for indicating the fast soruce task to sleep for
> > > specific time for example. To do so, the coordinator has the global
> > > informations to give the proper decision for individuals, so it seems

[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10888:
---

 Summary: Expose new global watermark RPC to sources
 Key: FLINK-10888
 URL: https://issues.apache.org/jira/browse/FLINK-10888
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


Expose new JobMaster RPC for watermark tracking to Source implementations so it 
can be used to align reads across sources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10887:
---

 Summary: Add source watermarking tracking to the JobMaster
 Key: FLINK-10887
 URL: https://issues.apache.org/jira/browse/FLINK-10887
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: Jamie Grier
Assignee: Jamie Grier


We need to add a new RPC to the JobMaster such that the current watermark for 
every source sub-task can be reported and the current global minimum/maximum 
watermark can be retrieved so that each source can adjust their partition read 
rates in an attempt to keep sources roughly aligned in event time.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10886) Event time synchronization across sources

2018-11-14 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-10886:
---

 Summary: Event time synchronization across sources
 Key: FLINK-10886
 URL: https://issues.apache.org/jira/browse/FLINK-10886
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Jamie Grier
Assignee: Jamie Grier


When reading from a source with many parallel partitions, especially when 
reading lots of historical data (or recovering from downtime and there is a 
backlog to read), it's quite common for there to develop an event-time skew 
across those partitions.
 
When doing event-time windowing -- or in fact any event-time driven processing 
-- the event time skew across partitions results directly in increased 
buffering in Flink and of course the corresponding state/checkpoint size growth.
 
As the event-time skew and state size grows larger this can have a major effect 
on application performance and in some cases result in a "death spiral" where 
the application performance get's worse and worse as the state size grows and 
grows.
 
So, one solution to this problem, outside of core changes in Flink itself, 
seems to be to try to coordinate sources across partitions so that they make 
progress through event time at roughly the same rate.  In fact if there is 
large skew the idea would be to slow or even stop reading from some partitions 
with newer data while first reading the partitions with older data.  Anyway, to 
do this we need to share state somehow amongst sub-tasks.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: support/docs for compression in StreamingFileSink

2018-11-14 Thread Addison Higham
Just noticed one detail about using the BulkWriter interface, you no longer
can assign a rolling policy. That makes sense for formats like orc/parquet,
but perhaps not for simple text compression.



On Wed, Nov 14, 2018 at 1:43 PM Addison Higham  wrote:

> HI all,
>
> I am moving some code to use the StreamingFileSink. Currently, it doesn't
> look like there is any native support for compression (gzip or otherwise)
> built into flink when using the StreamingFileSink. It seems like this is a
> really common need that as far as I could tell, wasn't represented in jira.
>
> After a fair amount of digging, it seems like the way to do that is to
> implement that is the BulkWriter interface where you can trivially wrap an
> outputStream with something like a GZIPOutputStream.
>
> It seems like it would make sense that until compression functionality is
> built into the StreamingFileSink, it might make sense to add some docs on
> how to use compression with the StreamingFileSink.
>
> I am willing to spend a bit of time documenting that, but before I do i
> wanted to make sure I understand if that is in fact the correct way to
> think about this problem and get your thoughts.
>
> Thanks!
>
>
>


support/docs for compression in StreamingFileSink

2018-11-14 Thread Addison Higham
HI all,

I am moving some code to use the StreamingFileSink. Currently, it doesn't
look like there is any native support for compression (gzip or otherwise)
built into flink when using the StreamingFileSink. It seems like this is a
really common need that as far as I could tell, wasn't represented in jira.

After a fair amount of digging, it seems like the way to do that is to
implement that is the BulkWriter interface where you can trivially wrap an
outputStream with something like a GZIPOutputStream.

It seems like it would make sense that until compression functionality is
built into the StreamingFileSink, it might make sense to add some docs on
how to use compression with the StreamingFileSink.

I am willing to spend a bit of time documenting that, but before I do i
wanted to make sure I understand if that is in fact the correct way to
think about this problem and get your thoughts.

Thanks!


[jira] [Created] (FLINK-10885) Avro Confluent Schema Registry E2E test failed on Travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10885:


 Summary: Avro Confluent Schema Registry E2E test failed on Travis
 Key: FLINK-10885
 URL: https://issues.apache.org/jira/browse/FLINK-10885
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats, E2E Tests, 
Table API  SQL
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/zentol/flink/jobs/454943551

{code}
Waiting for schema registry...
[2018-11-14 12:20:59,394] ERROR Server died unexpectedly:  
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
org.apache.kafka.common.config.ConfigException: No supported Kafka endpoints 
are configured. Either kafkastore.bootstrap.servers must have at least one 
endpoint matching kafkastore.security.protocol or broker endpoints loaded from 
ZooKeeper must have at least one endpoint matching kafkastore.security.protocol.
at 
io.confluent.kafka.schemaregistry.storage.KafkaStore.endpointsToBootstrapServers(KafkaStore.java:313)
at 
io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:130)
at 
io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:144)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
at io.confluent.rest.Application.createServer(Application.java:149)
at 
io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-14 Thread wgcn (JIRA)
wgcn created FLINK-10884:


 Summary: Flink on yarn  TM container will be killed by nodemanager 
because of  the exceeded  physical memory.
 Key: FLINK-10884
 URL: https://issues.apache.org/jira/browse/FLINK-10884
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management, Core
Affects Versions: 1.6.2
 Environment: version  : 1.6.2 

module : flink on yarn

centos  jdk1.8

hadoop 2.7
Reporter: wgcn


TM container will be killed by nodemanager because of  the exceeded  
[physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
 memory. I found the lanuch context   lanuching TM container  that  "container 
memory =   heap memory+ offHeapSizeMB"  at the class 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
from line 160 to 166  I set a safety margin for the whole memory container 
using. For example  if the container  limit 3g  memory,  the sum memory that   
"heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container being 
killed.Do we have the 
[ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
 solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10883:


 Summary: Submitting a jobs without enough slots times out due to a 
unspecified timeout
 Key: FLINK-10883
 URL: https://issues.apache.org/jira/browse/FLINK-10883
 Project: Flink
  Issue Type: Improvement
  Components: Job-Submission
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


When submitting a job without enough slots being available the job will stay in 
a SCHEDULED/CREATED state. After some time (a few minutes) the job execution 
will fail with the following timeout exception:
{code}
2018-11-14 13:38:26,615 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) 
(1/$java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

That the job submission will time out is not documented, neither is which 
timeout is responsible in the first place or how/whether this can be disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10882) Misleading job/task state for scheduled jobs

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10882:


 Summary: Misleading job/task state for scheduled jobs
 Key: FLINK-10882
 URL: https://issues.apache.org/jira/browse/FLINK-10882
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
 Attachments: list_view.png, task_view.png

When submitting a job when not enough resources are available currently cuases 
the job  stay in a {{CREATE/SCHEDULED}} state.
There are 2 issues with how this is presented in the UI.

The {{Running Jobs}} page incorrectly states that the job is running.
(see list_view attachment)

The state display for individual tasks either
# States the task is in a CREATED state, when it is actually SCHEDULED
# States the task is in a CREATED state, but the count for all state boxes is 
zero.
(see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10881) SavepointITCase deadlocks on travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10881:


 Summary: SavepointITCase deadlocks on travis
 Key: FLINK-10881
 URL: https://issues.apache.org/jira/browse/FLINK-10881
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Tests
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/454898424



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10880) Failover strategies should not be applied to Batch Execution

2018-11-14 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-10880:


 Summary: Failover strategies should not be applied to Batch 
Execution
 Key: FLINK-10880
 URL: https://issues.apache.org/jira/browse/FLINK-10880
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.6.2
Reporter: Stephan Ewen
 Fix For: 1.6.3, 1.7.0


When configuring a failover strategy other than "full", DataSet/Batch execution 
is currently not correct.

This is expected, the failover region strategy is an experimental WIP feature 
for streaming that has not been extended to the DataSet API.

We need to document this and prevent execution of DataSet features with other 
failover strategies than "full".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: REST job submission

2018-11-14 Thread Flavio Pompermaier
Done: https://issues.apache.org/jira/browse/FLINK-10879

On Wed, Nov 14, 2018 at 10:14 AM Chesnay Schepler 
wrote:

> I wouldn't consider it a _bug_ in that sense, but agree that the current
> behavior isn't ideal. Running a job via the CLI or WebUI should behave
> the same way, as such please open a JIRA.
>
> On 12.11.2018 12:50, Flavio Pompermaier wrote:
> > Hi to all,
> > in our ETL we need to call an external (REST) service once a job ends: we
> > extract informations about accumulators and we update the job status.
> > However this is only possible if using the CLI client: if we call the job
> > via the REST API o Web UI (that is very useful to decouple our UI from
> the
> > Flink cluster) then this is not possible, because the REST API cannot
> > execute any code after env.execute().
> > I think that this is a very huge limitation: first of all, when writing
> > (and debugging) a Flink job, you assume that you can call multiple times
> > execute() and use the returned JobExecutionResult.
> > In second instance, the binary client and the rest client behaves
> > differently (with the CLI client everything works as expected).
> >
> > What do you think about this? Is this a bug or not?
> >
> > PS: I think also that the REST client should not be aware of any jar or
> > class instance, it should just call the job manager with the proper class
> > name and jar id (plus other options of course).
> >
> > Cheers,
> > Flavio
> >
>
>


[jira] [Created] (FLINK-10879) Align Flink clients on env.execute()

2018-11-14 Thread Flavio Pompermaier (JIRA)
Flavio Pompermaier created FLINK-10879:
--

 Summary: Align Flink clients on env.execute()
 Key: FLINK-10879
 URL: https://issues.apache.org/jira/browse/FLINK-10879
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.6.2
Reporter: Flavio Pompermaier


Right now the REST APIs do not support any code after env.execute while the 
Flink API, CLI client or the code executed within the IDE do.

Both clients should behave in the same way (supporting env.execute() to return 
something and continue the code execution or not).

See the discussion on the DEV ML for more details: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10878) State evolution E2E test failed on travis

2018-11-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10878:


 Summary: State evolution E2E test failed on travis
 Key: FLINK-10878
 URL: https://issues.apache.org/jira/browse/FLINK-10878
 Project: Flink
  Issue Type: Bug
  Components: E2E Tests, State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/builds/454402843

{code}
2018-11-13 13:21:14,096 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
checkpoint 66 by task c375b648d817e2a1f25ab786094bfaac of job 
81af79ed99a57065b3f0cbeb3cd42b2b.
2018-11-13 13:21:14,097 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

{code}
2018-11-13 13:21:14,035 INFO  
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder  - Declining 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-11-13 13:21:14,351 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing 
heap keyed state backend with stream factory.
2018-11-13 13:21:14,351 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing 
heap keyed state backend from snapshot.
2018-11-13 13:21:15,212 INFO  
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder  - Declining 
checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException:
 Task received cancellation from one of its inputs
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:404)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:304)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
2018-11-13 13:21:19,680 INFO  org.apache.flink.runt
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom

2018-11-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10877:
-

 Summary: Remove duplicate dependency entries in kafka connector pom
 Key: FLINK-10877
 URL: https://issues.apache.org/jira/browse/FLINK-10877
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.7.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries 
for {{flink-connector-kafka-base}} and moreover excludes dependencies from a 
test-jar. This is not necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10876) Deadlock if closing firstly pending transactions in FlinkKafkaProducer(011).close()

2018-11-14 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-10876:
---

 Summary: Deadlock if closing firstly pending transactions in 
FlinkKafkaProducer(011).close()
 Key: FLINK-10876
 URL: https://issues.apache.org/jira/browse/FLINK-10876
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Reporter: Andrey Zagrebin


While working on FLINK-10455, I encountered a deadlock in 
_FlinkKafkaProducer(011).close()_ if _pendingTransactions_ are closed before 
_currentTransaction_. There is no deadlock other way around.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: REST job submission

2018-11-14 Thread Chesnay Schepler
I wouldn't consider it a _bug_ in that sense, but agree that the current 
behavior isn't ideal. Running a job via the CLI or WebUI should behave 
the same way, as such please open a JIRA.


On 12.11.2018 12:50, Flavio Pompermaier wrote:

Hi to all,
in our ETL we need to call an external (REST) service once a job ends: we
extract informations about accumulators and we update the job status.
However this is only possible if using the CLI client: if we call the job
via the REST API o Web UI (that is very useful to decouple our UI from the
Flink cluster) then this is not possible, because the REST API cannot
execute any code after env.execute().
I think that this is a very huge limitation: first of all, when writing
(and debugging) a Flink job, you assume that you can call multiple times
execute() and use the returned JobExecutionResult.
In second instance, the binary client and the rest client behaves
differently (with the CLI client everything works as expected).

What do you think about this? Is this a bug or not?

PS: I think also that the REST client should not be aware of any jar or
class instance, it should just call the job manager with the proper class
name and jar id (plus other options of course).

Cheers,
Flavio





[jira] [Created] (FLINK-10875) Add `toTableWithTimestamp` method in `DataStreamConversions`

2018-11-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-10875:
---

 Summary: Add `toTableWithTimestamp` method in 
`DataStreamConversions`
 Key: FLINK-10875
 URL: https://issues.apache.org/jira/browse/FLINK-10875
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: sunjincheng
Assignee: sunjincheng
 Fix For: 1.7.1


Currently we convert a `DataStream` to a `Table` by  
`DataStreamConversions#toTable`, e.g.:
{code:java}
// Without TimeAttribute
...
val stream = env.fromCollection(...)
val tab = stream.toTable(tEnv, 'a, 'b, 'c)
val result = tab.select('a, 'b)


// With TimeAttribute
...
val stream = env.fromCollection(...).assignTimestampsAndWatermarks(...)
val tab = stream.toTable(tEnv, 'a, 'b, 'c, 'ts.rowtime)
val result = tab.window(Session withGap 5.milli on 'ts as 'w)
...{code}
I think the fieldNames parameter in the `toTable` method is reasonable in the 
conversion without the time attribute, because the fieldNames will actually 
correspond to the fields of the physical table, but when applied to the 
conversion with the time attribute, the time attribute column is silently added 
to the table. This feeling is very Magical, so I recommend adding a method that 
allows the user to display the time attribute added to the physical table: 
`toTableWithTimestamp`, which is automatically named to the time attribute 
column named by user input and TimeCharacteristic, eg:
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
val table = stream.toTableWithTimestamp(tEnv, 'count, 'size, 'name, 'ts)
  .window(Tumble over 2.rows on 'ts as 'w)
...
{code}
In the example above the flink will mark `ts` ad a `RowtimeAttribute`.

What do you think ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure

2018-11-14 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10874:
--

 Summary: Kafka 2.0 connector 
testMigrateFromAtLeastOnceToExactlyOnce failure
 Key: FLINK-10874
 URL: https://issues.apache.org/jira/browse/FLINK-10874
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.8.0
Reporter: Piotr Nowojski


https://api.travis-ci.org/v3/job/454449444/log.txt


{noformat}
Test 
testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 is running.

16:35:07,894 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
[transaction.timeout.ms] not specified. Setting it to 360 ms
16:35:07,903 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
FlinkKafkaInternalProducer (1/1) to produce into default topic 
testMigrateFromAtLeastOnceToExactlyOnce
16:35:08,785 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase  - 

Test 
testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 failed with:
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: This server does not