Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
Thanks, Bowen, for catching the error. I have granted comment permission with the link. I also updated the doc with the latest class definitions. Everyone is encouraged to review and comment. Thanks, Xuefu -- Sender:Bowen Li Sent at:2018 Nov 14 (Wed) 06:44 Recipient:Xuefu Cc:piotr ; dev ; Shuyi Chen Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem Hi Xuefu, Currently the new design doc is on “view only" mode, and people cannot leave comments. Can you please change it to "can comment" or "can edit" mode? Thanks, Bowen On Mon, Nov 12, 2018 at 9:51 PM Zhang, Xuefu wrote: Hi Piotr I have extracted the API portion of the design and the google doc is here. Please review and provide your feedback. Thanks, Xuefu -- Sender:Xuefu Sent at:2018 Nov 12 (Mon) 12:43 Recipient:Piotr Nowojski ; dev Cc:Bowen Li ; Shuyi Chen Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem Hi Piotr, That sounds good to me. Let's close all the open questions ((there are a couple of them)) in the Google doc and I should be able to quickly split it into the three proposals as you suggested. Thanks, Xuefu -- Sender:Piotr Nowojski Sent at:2018 Nov 9 (Fri) 22:46 Recipient:dev ; Xuefu Cc:Bowen Li ; Shuyi Chen Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem Hi, Yes, it seems like the best solution. Maybe someone else can also suggests if we can split it further? Maybe changes in the interface in one doc, reading from hive meta store another and final storing our meta informations in hive meta store? Piotrek > On 9 Nov 2018, at 01:44, Zhang, Xuefu wrote: > > Hi Piotr, > > That seems to be good idea! > > Since the google doc for the design is currently under extensive review, I > will leave it as it is for now. However, I'll convert it to two different > FLIPs when the time comes. > > How does it sound to you? > > Thanks, > Xuefu > > > -- > Sender:Piotr Nowojski > Sent at:2018 Nov 9 (Fri) 02:31 > Recipient:dev > Cc:Bowen Li ; Xuefu ; Shuyi > Chen > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem > > Hi, > > Maybe we should split this topic (and the design doc) into couple of smaller > ones, hopefully independent. The questions that you have asked Fabian have > for example very little to do with reading metadata from Hive Meta Store? > > Piotrek > >> On 7 Nov 2018, at 14:27, Fabian Hueske wrote: >> >> Hi Xuefu and all, >> >> Thanks for sharing this design document! >> I'm very much in favor of restructuring / reworking the catalog handling in >> Flink SQL as outlined in the document. >> Most changes described in the design document seem to be rather general and >> not specifically related to the Hive integration. >> >> IMO, there are some aspects, especially those at the boundary of Hive and >> Flink, that need a bit more discussion. For example >> >> * What does it take to make Flink schema compatible with Hive schema? >> * How will Flink tables (descriptors) be stored in HMS? >> * How do both Hive catalogs differ? Could they be integrated into to a >> single one? When to use which one? >> * What meta information is provided by HMS? What of this can be leveraged >> by Flink? >> >> Thank you, >> Fabian >> >> Am Fr., 2. Nov. 2018 um 00:31 Uhr schrieb Bowen Li : >> >>> After taking a look at how other discussion threads work, I think it's >>> actually fine just keep our discussion here. It's up to you, Xuefu. >>> >>> The google doc LGTM. I left some minor comments. >>> >>> On Thu, Nov 1, 2018 at 10:17 AM Bowen Li wrote: >>> Hi all, As Xuefu has published the design doc on google, I agree with Shuyi's suggestion that we probably should start a new email thread like "[DISCUSS] ... Hive integration design ..." on only dev mailing list for community devs to review. The current thread sends to both dev and user list. This email thread is more like validating the general idea and direction with the community, and it's been pretty long and crowded so far. Since everyone is pro for the idea, we can move forward with another thread to discuss and finalize the design. Thanks, Bowen On Wed, Oct 31, 2018 at 12:16 PM Zhang, Xuefu wrote: > Hi Shuiyi, > > Good idea. Actually the PDF was converted from a google doc. Here is its > link: > > https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing > Once we reach an agreement, I can convert it to a FLIP. > > Thanks, > Xuefu > > > > -- > Sender:Shuyi Chen >
[jira] [Created] (FLINK-10891) Upgrade Kafka client version to 2.0.1
vinoyang created FLINK-10891: Summary: Upgrade Kafka client version to 2.0.1 Key: FLINK-10891 URL: https://issues.apache.org/jira/browse/FLINK-10891 Project: Flink Issue Type: Sub-task Components: Kafka Connector Reporter: vinoyang Assignee: vinoyang Since the modern kafka connector only keeps track of the latest version of the kafka client. With the release of Kafka 2.0.1, we should upgrade the version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10890) CLONE - Add DataStream HBase Sink
jocean@gamil.com created FLINK-10890: Summary: CLONE - Add DataStream HBase Sink Key: FLINK-10890 URL: https://issues.apache.org/jira/browse/FLINK-10890 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: jocean@gamil.com Assignee: Shimin Yang Design documentation: [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] FLIP-27: Refactor Source Interface
Hi Piotrek, Thanks a lot for the detailed reply. All makes sense to me. WRT the confusion between advance() / getCurrent(), do you think it would help if we combine them and have something like: CompletableFuture getNext(); long getWatermark(); long getCurrentTimestamp(); Cheers, Jiangjie (Becket) Qin On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski wrote: > Hi, > > Thanks again for the detailed answer :) Sorry for responding with a delay. > > > Completely agree that in pattern 2, having a callback is necessary for > that > > single thread outside of the connectors. And the connectors MUST have > > internal threads. > > Yes, this thread will have to exists somewhere. In pattern 2 it exists in > the connector (at least from the perspective of the Flink execution > engine). In pattern 1 it exists inside the Flink execution engine. With > completely blocking connectors, like simple reading from files, both of > those approaches are basically the same. The difference is when user > implementing Flink source is already working with a non blocking code with > some internal threads. In this case, pattern 1 would result in "double > thread wrapping”, while pattern 2 would allow to skip one layer of > indirection. > > > If we go that way, we should have something like "void > > poll(Callback) / void advance(callback)". I am curious how would > > CompletableFuture work here, though. If 10 readers returns 10 completable > > futures, will there be 10 additional threads (so 20 threads in total) > > blocking waiting on them? Or will there be a single thread busy loop > > checking around? > > To be honest, I haven’t thought this completely through and I haven’t > tested/POC’ed it. Having said that, I can think of at least couple of > solutions. First is something like this: > > > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > < > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > > > > Line: > > `blocked = split.process();` > > Is where the execution goes into to the task/sources. This is where the > returned future is handled: > > blocked.addListener(() -> { > blockedSplits.remove(split); > // reset the level priority to prevent > previously-blocked splits from starving existing splits > split.resetLevelPriority(); > waitingSplits.offer(split); > }, executor); > > Fundamentally callbacks and Futures are more or less interchangeable You > can always wrap one into another (creating a callback that completes a > future and attach a callback once future completes). In this case the > difference for me is mostly: > - api with passing callback allows the callback to be fired multiple times > and to fire it even if the connector is not blocked. This is what I meant > by saying that api `CompletableFuture isBlocked()` is a bit simpler. > Connector can only return either “I’m not blocked” or “I’m blocked and I > will tell you only once when I’m not blocked anymore”. > > But this is not the most important thing for me here. For me important > thing is to try our best to make Flink task’s control and execution single > threaded. For that both callback and future APIs should work the same. > > > WRT pattern 1, a single blocking take() API should just work. The good > > thing is that a blocking read API is usually simpler to implement. > > Yes, they are easier to implement (especially if you are not the one that > have to deal with the additional threading required around them ;) ). But > to answer this issue, if we choose pattern 2, we can always provide a > proxy/wrapper that would using the internal thread implement the > non-blocking API while exposing blocking API to the user. It would > implement pattern 2 for the user exposing to him pattern 1. In other words > implementing pattern 1 in pattern 2 paradigm, while making it possible to > implement pure pattern 2 connectors. > > > BTW, one thing I am also trying to avoid is pushing users to perform IO > in > > a method like "isBlocked()". If the method is expected to fetch records > > (even if not returning them), naming it something more explicit would > help > > avoid confusion. > > If we choose so, we could rework it into something like: > > CompletableFuture advance() > T getCurrent(); > Watermark getCurrentWatermark() > > But as I wrote before, this is more confusing to me for the exact reasons > you mentioned :) I would be confused what should be done in `adanvce()` and > what in `getCurrent()`. However, again this naming issue is not that > important to me and probably is matter of taste/personal preferences. > > Piotrek > > > On 9 Nov 2018, at
Need help with Kinesis related PRs
Hi, The following two PRs need review from committers: https://github.com/apache/flink/pull/6980 https://github.com/apache/flink/pull/6968 Any help greatly appreciated! Thanks, Thomas
[jira] [Created] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print
Jeff Zhang created FLINK-10889: -- Summary: Semantic inconsistency between DataSet#print and DataStream#print Key: FLINK-10889 URL: https://issues.apache.org/jira/browse/FLINK-10889 Project: Flink Issue Type: Improvement Components: DataSet API, DataStream API Reporter: Jeff Zhang DataSet#print will print the result on client side, while DataStream#print will print the result on TM. This inconsistency will confuse users. IMHO, we should make the behavior consistency between DataSet and DataStream, I prefer to print the result on client side. Regarding DataStream#print, we can use DataStreamUtils#collect to print it on client side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Sharing state between subtasks
Hey all, I think all we need for this on the state sharing side is pretty simple. I opened a JIRA to track this work and submitted a PR for the state sharing bit. https://issues.apache.org/jira/browse/FLINK-10886 https://github.com/apache/flink/pull/7099 Please provide feedback :) -Jamie On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann wrote: > Hi Thomas, > > using Akka directly would further manifest our dependency on Scala in > flink-runtime. This is something we are currently trying to get rid of. For > that purpose we have added the RpcService abstraction which encapsulates > all Akka specific logic. We hope that we can soon get rid of the Scala > dependency in flink-runtime by using a special class loader only for > loading the AkkaRpcService implementation. > > I think the easiest way to sync the task information is actually going > through the JobMaster because the subtasks don't know on which other TMs > the other subtasks run. Otherwise, we would need to have some TM detection > mechanism between TMs. If you choose this way, then you should be able to > use the RpcService by extending the JobMasterGateway by additional RPCs. > > Cheers, > Till > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise wrote: > > > Hi, > > > > We are planning to work on the Kinesis consumer in the following order: > > > > 1. Add per shard watermarking: > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we > > already use internally; I will open a PR to add it to the Flink Kinesis > > consumer > > 2. Exchange of per subtask watermarks between all subtasks of one or > > multiple sources > > 3. Implement queue approach described in Jamie's document in to utilize > 1.) > > and 2.) to align the shard consumers WRT event time > > > > There was some discussion regarding the mechanism to share the watermarks > > between subtasks. If there is something that can be re-used it would be > > great. Otherwise I'm going to further investigate the Akka or JGroups > > route. Regarding Akka, since it is used within Flink already, is there an > > abstraction that you would recommend to consider to avoid direct > > dependency? > > > > Thanks, > > Thomas > > > > > > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999) > > wrote: > > > > > Not yet. We only have some initial thoughts and have not worked on it > > yet. > > > We will update the progress in this discussion if have. > > > > > > Best, > > > Zhijiang > > > -- > > > 发件人:Aljoscha Krettek > > > 发送时间:2018年10月18日(星期四) 17:53 > > > 收件人:dev ; Zhijiang(wangzhijiang999) < > > > wangzhijiang...@aliyun.com> > > > 抄 送:Till Rohrmann > > > 主 题:Re: Sharing state between subtasks > > > > > > Hi Zhijiang, > > > > > > do you already have working code or a design doc for the second > approach? > > > > > > Best, > > > Aljoscha > > > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) < > > > wangzhijiang...@aliyun.com.INVALID> wrote: > > > > > > > > Just noticed this discussion from @Till Rohrmann's weekly community > > > update and I want to share some thoughts from our experiences. > > > > > > > > We also encountered the source consuption skew issue before, and we > are > > > focused on improving this by two possible ways. > > > > > > > > 1. Control the read strategy by the downstream side. In detail, every > > > input channel in downstream task corresponds to the consumption of one > > > upstream source task, and we will tag each input channel with watermark > > to > > > find the lowest channel to read in high priority. In essence, we > actually > > > rely on the mechanism of backpressure. If the channel with highest > > > timestamp is not read by downstream task for a while, it will block the > > > corresponding source task to read when the buffers are exhausted. It is > > no > > > need to change the source interface in this way, but there are two > major > > > concerns: first it will affect the barier alignment resulting in > > checkpoint > > > delayed or expired. Second it can not confirm source consumption > > alignment > > > very precisely, and it is just a best effort way. So we gave up this > way > > > finally. > > > > > > > > 2. Add the new component of SourceCoordinator to coordinate the > source > > > consumption distributedly. For example we can start this componnet in > the > > > JobManager like the current role of CheckpointCoordinator. Then every > > > source task would commnicate with JobManager via current RPC mechanism, > > > maybe we can rely on the heartbeat message to attach the consumption > > > progress as the payloads. The JobManagerwill accumulator or state all > the > > > reported progress and then give responses for different source tasks. > We > > > can define a protocol for indicating the fast soruce task to sleep for > > > specific time for example. To do so, the coordinator has the global > > > informations to give the proper decision for individuals, so it seems
[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources
Jamie Grier created FLINK-10888: --- Summary: Expose new global watermark RPC to sources Key: FLINK-10888 URL: https://issues.apache.org/jira/browse/FLINK-10888 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier Expose new JobMaster RPC for watermark tracking to Source implementations so it can be used to align reads across sources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster
Jamie Grier created FLINK-10887: --- Summary: Add source watermarking tracking to the JobMaster Key: FLINK-10887 URL: https://issues.apache.org/jira/browse/FLINK-10887 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: Jamie Grier Assignee: Jamie Grier We need to add a new RPC to the JobMaster such that the current watermark for every source sub-task can be reported and the current global minimum/maximum watermark can be retrieved so that each source can adjust their partition read rates in an attempt to keep sources roughly aligned in event time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10886) Event time synchronization across sources
Jamie Grier created FLINK-10886: --- Summary: Event time synchronization across sources Key: FLINK-10886 URL: https://issues.apache.org/jira/browse/FLINK-10886 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier When reading from a source with many parallel partitions, especially when reading lots of historical data (or recovering from downtime and there is a backlog to read), it's quite common for there to develop an event-time skew across those partitions. When doing event-time windowing -- or in fact any event-time driven processing -- the event time skew across partitions results directly in increased buffering in Flink and of course the corresponding state/checkpoint size growth. As the event-time skew and state size grows larger this can have a major effect on application performance and in some cases result in a "death spiral" where the application performance get's worse and worse as the state size grows and grows. So, one solution to this problem, outside of core changes in Flink itself, seems to be to try to coordinate sources across partitions so that they make progress through event time at roughly the same rate. In fact if there is large skew the idea would be to slow or even stop reading from some partitions with newer data while first reading the partitions with older data. Anyway, to do this we need to share state somehow amongst sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: support/docs for compression in StreamingFileSink
Just noticed one detail about using the BulkWriter interface, you no longer can assign a rolling policy. That makes sense for formats like orc/parquet, but perhaps not for simple text compression. On Wed, Nov 14, 2018 at 1:43 PM Addison Higham wrote: > HI all, > > I am moving some code to use the StreamingFileSink. Currently, it doesn't > look like there is any native support for compression (gzip or otherwise) > built into flink when using the StreamingFileSink. It seems like this is a > really common need that as far as I could tell, wasn't represented in jira. > > After a fair amount of digging, it seems like the way to do that is to > implement that is the BulkWriter interface where you can trivially wrap an > outputStream with something like a GZIPOutputStream. > > It seems like it would make sense that until compression functionality is > built into the StreamingFileSink, it might make sense to add some docs on > how to use compression with the StreamingFileSink. > > I am willing to spend a bit of time documenting that, but before I do i > wanted to make sure I understand if that is in fact the correct way to > think about this problem and get your thoughts. > > Thanks! > > >
support/docs for compression in StreamingFileSink
HI all, I am moving some code to use the StreamingFileSink. Currently, it doesn't look like there is any native support for compression (gzip or otherwise) built into flink when using the StreamingFileSink. It seems like this is a really common need that as far as I could tell, wasn't represented in jira. After a fair amount of digging, it seems like the way to do that is to implement that is the BulkWriter interface where you can trivially wrap an outputStream with something like a GZIPOutputStream. It seems like it would make sense that until compression functionality is built into the StreamingFileSink, it might make sense to add some docs on how to use compression with the StreamingFileSink. I am willing to spend a bit of time documenting that, but before I do i wanted to make sure I understand if that is in fact the correct way to think about this problem and get your thoughts. Thanks!
[jira] [Created] (FLINK-10885) Avro Confluent Schema Registry E2E test failed on Travis
Chesnay Schepler created FLINK-10885: Summary: Avro Confluent Schema Registry E2E test failed on Travis Key: FLINK-10885 URL: https://issues.apache.org/jira/browse/FLINK-10885 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats, E2E Tests, Table API SQL Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/zentol/flink/jobs/454943551 {code} Waiting for schema registry... [2018-11-14 12:20:59,394] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51) org.apache.kafka.common.config.ConfigException: No supported Kafka endpoints are configured. Either kafkastore.bootstrap.servers must have at least one endpoint matching kafkastore.security.protocol or broker endpoints loaded from ZooKeeper must have at least one endpoint matching kafkastore.security.protocol. at io.confluent.kafka.schemaregistry.storage.KafkaStore.endpointsToBootstrapServers(KafkaStore.java:313) at io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:130) at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:144) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37) at io.confluent.rest.Application.createServer(Application.java:149) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
wgcn created FLINK-10884: Summary: Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory. Key: FLINK-10884 URL: https://issues.apache.org/jira/browse/FLINK-10884 Project: Flink Issue Type: Bug Components: Cluster Management, Core Affects Versions: 1.6.2 Environment: version : 1.6.2 module : flink on yarn centos jdk1.8 hadoop 2.7 Reporter: wgcn TM container will be killed by nodemanager because of the exceeded [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] memory. I found the lanuch context lanuching TM container that "container memory = heap memory+ offHeapSizeMB" at the class org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters from line 160 to 166 I set a safety margin for the whole memory container using. For example if the container limit 3g memory, the sum memory that "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container being killed.Do we have the [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
Chesnay Schepler created FLINK-10883: Summary: Submitting a jobs without enough slots times out due to a unspecified timeout Key: FLINK-10883 URL: https://issues.apache.org/jira/browse/FLINK-10883 Project: Flink Issue Type: Improvement Components: Job-Submission Affects Versions: 1.7.0 Reporter: Chesnay Schepler When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission will time out is not documented, neither is which timeout is responsible in the first place or how/whether this can be disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10882) Misleading job/task state for scheduled jobs
Chesnay Schepler created FLINK-10882: Summary: Misleading job/task state for scheduled jobs Key: FLINK-10882 URL: https://issues.apache.org/jira/browse/FLINK-10882 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.7.0 Reporter: Chesnay Schepler Attachments: list_view.png, task_view.png When submitting a job when not enough resources are available currently cuases the job stay in a {{CREATE/SCHEDULED}} state. There are 2 issues with how this is presented in the UI. The {{Running Jobs}} page incorrectly states that the job is running. (see list_view attachment) The state display for individual tasks either # States the task is in a CREATED state, when it is actually SCHEDULED # States the task is in a CREATED state, but the count for all state boxes is zero. (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10881) SavepointITCase deadlocks on travis
Chesnay Schepler created FLINK-10881: Summary: SavepointITCase deadlocks on travis Key: FLINK-10881 URL: https://issues.apache.org/jira/browse/FLINK-10881 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Tests Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/jobs/454898424 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10880) Failover strategies should not be applied to Batch Execution
Stephan Ewen created FLINK-10880: Summary: Failover strategies should not be applied to Batch Execution Key: FLINK-10880 URL: https://issues.apache.org/jira/browse/FLINK-10880 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.6.2 Reporter: Stephan Ewen Fix For: 1.6.3, 1.7.0 When configuring a failover strategy other than "full", DataSet/Batch execution is currently not correct. This is expected, the failover region strategy is an experimental WIP feature for streaming that has not been extended to the DataSet API. We need to document this and prevent execution of DataSet features with other failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: REST job submission
Done: https://issues.apache.org/jira/browse/FLINK-10879 On Wed, Nov 14, 2018 at 10:14 AM Chesnay Schepler wrote: > I wouldn't consider it a _bug_ in that sense, but agree that the current > behavior isn't ideal. Running a job via the CLI or WebUI should behave > the same way, as such please open a JIRA. > > On 12.11.2018 12:50, Flavio Pompermaier wrote: > > Hi to all, > > in our ETL we need to call an external (REST) service once a job ends: we > > extract informations about accumulators and we update the job status. > > However this is only possible if using the CLI client: if we call the job > > via the REST API o Web UI (that is very useful to decouple our UI from > the > > Flink cluster) then this is not possible, because the REST API cannot > > execute any code after env.execute(). > > I think that this is a very huge limitation: first of all, when writing > > (and debugging) a Flink job, you assume that you can call multiple times > > execute() and use the returned JobExecutionResult. > > In second instance, the binary client and the rest client behaves > > differently (with the CLI client everything works as expected). > > > > What do you think about this? Is this a bug or not? > > > > PS: I think also that the REST client should not be aware of any jar or > > class instance, it should just call the job manager with the proper class > > name and jar id (plus other options of course). > > > > Cheers, > > Flavio > > > >
[jira] [Created] (FLINK-10879) Align Flink clients on env.execute()
Flavio Pompermaier created FLINK-10879: -- Summary: Align Flink clients on env.execute() Key: FLINK-10879 URL: https://issues.apache.org/jira/browse/FLINK-10879 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.6.2 Reporter: Flavio Pompermaier Right now the REST APIs do not support any code after env.execute while the Flink API, CLI client or the code executed within the IDE do. Both clients should behave in the same way (supporting env.execute() to return something and continue the code execution or not). See the discussion on the DEV ML for more details: http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10878) State evolution E2E test failed on travis
Chesnay Schepler created FLINK-10878: Summary: State evolution E2E test failed on travis Key: FLINK-10878 URL: https://issues.apache.org/jira/browse/FLINK-10878 Project: Flink Issue Type: Bug Components: E2E Tests, State Backends, Checkpointing Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/builds/454402843 {code} 2018-11-13 13:21:14,096 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 66 by task c375b648d817e2a1f25ab786094bfaac of job 81af79ed99a57065b3f0cbeb3cd42b2b. 2018-11-13 13:21:14,097 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} {code} 2018-11-13 13:21:14,035 INFO org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder - Declining checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-11-13 13:21:14,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. 2018-11-13 13:21:14,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot. 2018-11-13 13:21:15,212 INFO org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder - Declining checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException: Task received cancellation from one of its inputs at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:404) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:304) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:204) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) 2018-11-13 13:21:19,680 INFO org.apache.flink.runt {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10877) Remove duplicate dependency entries in kafka connector pom
Till Rohrmann created FLINK-10877: - Summary: Remove duplicate dependency entries in kafka connector pom Key: FLINK-10877 URL: https://issues.apache.org/jira/browse/FLINK-10877 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.7.0 Reporter: Till Rohrmann Assignee: Till Rohrmann The {{flink-connectorkafka}} {{pom.xml}} contains multiple dependency entries for {{flink-connector-kafka-base}} and moreover excludes dependencies from a test-jar. This is not necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10876) Deadlock if closing firstly pending transactions in FlinkKafkaProducer(011).close()
Andrey Zagrebin created FLINK-10876: --- Summary: Deadlock if closing firstly pending transactions in FlinkKafkaProducer(011).close() Key: FLINK-10876 URL: https://issues.apache.org/jira/browse/FLINK-10876 Project: Flink Issue Type: Bug Components: Kafka Connector Reporter: Andrey Zagrebin While working on FLINK-10455, I encountered a deadlock in _FlinkKafkaProducer(011).close()_ if _pendingTransactions_ are closed before _currentTransaction_. There is no deadlock other way around. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: REST job submission
I wouldn't consider it a _bug_ in that sense, but agree that the current behavior isn't ideal. Running a job via the CLI or WebUI should behave the same way, as such please open a JIRA. On 12.11.2018 12:50, Flavio Pompermaier wrote: Hi to all, in our ETL we need to call an external (REST) service once a job ends: we extract informations about accumulators and we update the job status. However this is only possible if using the CLI client: if we call the job via the REST API o Web UI (that is very useful to decouple our UI from the Flink cluster) then this is not possible, because the REST API cannot execute any code after env.execute(). I think that this is a very huge limitation: first of all, when writing (and debugging) a Flink job, you assume that you can call multiple times execute() and use the returned JobExecutionResult. In second instance, the binary client and the rest client behaves differently (with the CLI client everything works as expected). What do you think about this? Is this a bug or not? PS: I think also that the REST client should not be aware of any jar or class instance, it should just call the job manager with the proper class name and jar id (plus other options of course). Cheers, Flavio
[jira] [Created] (FLINK-10875) Add `toTableWithTimestamp` method in `DataStreamConversions`
sunjincheng created FLINK-10875: --- Summary: Add `toTableWithTimestamp` method in `DataStreamConversions` Key: FLINK-10875 URL: https://issues.apache.org/jira/browse/FLINK-10875 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: sunjincheng Assignee: sunjincheng Fix For: 1.7.1 Currently we convert a `DataStream` to a `Table` by `DataStreamConversions#toTable`, e.g.: {code:java} // Without TimeAttribute ... val stream = env.fromCollection(...) val tab = stream.toTable(tEnv, 'a, 'b, 'c) val result = tab.select('a, 'b) // With TimeAttribute ... val stream = env.fromCollection(...).assignTimestampsAndWatermarks(...) val tab = stream.toTable(tEnv, 'a, 'b, 'c, 'ts.rowtime) val result = tab.window(Session withGap 5.milli on 'ts as 'w) ...{code} I think the fieldNames parameter in the `toTable` method is reasonable in the conversion without the time attribute, because the fieldNames will actually correspond to the fields of the physical table, but when applied to the conversion with the time attribute, the time attribute column is silently added to the table. This feeling is very Magical, so I recommend adding a method that allows the user to display the time attribute added to the physical table: `toTableWithTimestamp`, which is automatically named to the time attribute column named by user input and TimeCharacteristic, eg: {code:java} env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ... val table = stream.toTableWithTimestamp(tEnv, 'count, 'size, 'name, 'ts) .window(Tumble over 2.rows on 'ts as 'w) ... {code} In the example above the flink will mark `ts` ad a `RowtimeAttribute`. What do you think ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
Piotr Nowojski created FLINK-10874: -- Summary: Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure Key: FLINK-10874 URL: https://issues.apache.org/jira/browse/FLINK-10874 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.8.0 Reporter: Piotr Nowojski https://api.travis-ci.org/v3/job/454449444/log.txt {noformat} Test testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) is running. 16:35:07,894 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Property [transaction.timeout.ms] not specified. Setting it to 360 ms 16:35:07,903 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic testMigrateFromAtLeastOnceToExactlyOnce 16:35:08,785 ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase - Test testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) failed with: java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: This server does not