回复:Memory size for network buffers
Hi Thomas, The current calculation for network buffer size is "Math.min(taskmanager.network.memory.max, Math.max(taskmanager.network.memory.min, fraction * totalMem))". Based on your below configuration, the result is just 32768 bytes (8 buffers) from taskmanager.network.memory.min. If you want to config the fixed network buffers, you can set the same values for min and max parameters, ignore the fraction value in the configuration. BTW, you can set the unit for these paramenters, such as min : 32kb. As for the thoughts of min and max setting, it is difficult to exactly know how many network buffers are needed in TaskManager startup and which kinds of tasks would be deployed to run in this TaskManager later. For example, the batch jobs can make use of as many network buffers as system can spare. But for stream jobs, the spare buffers can be used for other places for possible improvements. In order to keep the possibility of future improments and not change the configuration setting, we retain these current parameters. For you job I think you should increase the min value for more network buffers, the current 4 buffers are indeed not enough for common jobs. Best, Zhijiang -- 发件人:Thomas Weise 发送时间:2018年11月13日(星期二) 13:11 收件人:dev 主 题:Memory size for network buffers Hi, I'm trying to understand the intention behind the size parameters for network buffers, specifically max, min and fraction. The reason we are looking at it is an attempt to make the memory allocation elastic, so that memory is allocated according to the actual number of buffers required (within a range), without the need to tune this for every deployment. As of Flink 1.5, there are 3 parameters, but they all result in a fixed allocation, which is not what we were looking for. Here is an example just to illustrate it: taskmanager.network.memory.fraction: 0.01 taskmanager.network.memory.min: 32768 taskmanager.network.memory.max: 1073741824 taskmanager.memory.segment-size: 8192 I wanted fraction to be out of the picture (but 0 isn't an acceptable value). Then set min to something tiny that my job will exceed and max to something too large to reach. Unfortunately, that fails: java.io.IOException: Insufficient number of network buffers: required 8, but only 0 available. The total number of network buffers is currently set to 4 of 8192 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. So the question then is, why have min and max? Or is the intention to have a different implementation in the future? Thanks, Thomas
[jira] [Created] (FLINK-10857) Conflict between JMX and Prometheus Metrics reporter
Truong Duc Kien created FLINK-10857: --- Summary: Conflict between JMX and Prometheus Metrics reporter Key: FLINK-10857 URL: https://issues.apache.org/jira/browse/FLINK-10857 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.6.2 Reporter: Truong Duc Kien When registering both JMX and Prometheus metrics reporter, the Prometheus reporter will fail with an exception. {code:java} o.a.f.r.m.MetricRegistryImpl Error while registering metric. java.lang.IllegalArgumentException: Invalid metric name: flink_jobmanager.Status.JVM.Memory.Mapped_Count at org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:182) at org.apache.flink.shaded.io.prometheus.client.SimpleCollector.(SimpleCollector.java:164) at org.apache.flink.shaded.io.prometheus.client.Gauge.(Gauge.java:68) at org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.createCollector(AbstractPrometheusReporter.java:130) at org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.notifyOfAddedMetric(AbstractPrometheusReporter.java:106) at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:329) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:379) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:323) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateMemoryMetrics(MetricUtils.java:231) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:100) at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:342) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:233) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:176) {code} This is a small program to reproduce the problem: https://github.com/dikei/flink-metrics-conflict-test I -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
Hi Piotr I have extracted the API portion of the design and the google doc is here. Please review and provide your feedback. Thanks, Xuefu -- Sender:Xuefu Sent at:2018 Nov 12 (Mon) 12:43 Recipient:Piotr Nowojski ; dev Cc:Bowen Li ; Shuyi Chen Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem Hi Piotr, That sounds good to me. Let's close all the open questions ((there are a couple of them)) in the Google doc and I should be able to quickly split it into the three proposals as you suggested. Thanks, Xuefu -- Sender:Piotr Nowojski Sent at:2018 Nov 9 (Fri) 22:46 Recipient:dev ; Xuefu Cc:Bowen Li ; Shuyi Chen Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem Hi, Yes, it seems like the best solution. Maybe someone else can also suggests if we can split it further? Maybe changes in the interface in one doc, reading from hive meta store another and final storing our meta informations in hive meta store? Piotrek > On 9 Nov 2018, at 01:44, Zhang, Xuefu wrote: > > Hi Piotr, > > That seems to be good idea! > > Since the google doc for the design is currently under extensive review, I > will leave it as it is for now. However, I'll convert it to two different > FLIPs when the time comes. > > How does it sound to you? > > Thanks, > Xuefu > > > -- > Sender:Piotr Nowojski > Sent at:2018 Nov 9 (Fri) 02:31 > Recipient:dev > Cc:Bowen Li ; Xuefu ; Shuyi > Chen > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem > > Hi, > > Maybe we should split this topic (and the design doc) into couple of smaller > ones, hopefully independent. The questions that you have asked Fabian have > for example very little to do with reading metadata from Hive Meta Store? > > Piotrek > >> On 7 Nov 2018, at 14:27, Fabian Hueske wrote: >> >> Hi Xuefu and all, >> >> Thanks for sharing this design document! >> I'm very much in favor of restructuring / reworking the catalog handling in >> Flink SQL as outlined in the document. >> Most changes described in the design document seem to be rather general and >> not specifically related to the Hive integration. >> >> IMO, there are some aspects, especially those at the boundary of Hive and >> Flink, that need a bit more discussion. For example >> >> * What does it take to make Flink schema compatible with Hive schema? >> * How will Flink tables (descriptors) be stored in HMS? >> * How do both Hive catalogs differ? Could they be integrated into to a >> single one? When to use which one? >> * What meta information is provided by HMS? What of this can be leveraged >> by Flink? >> >> Thank you, >> Fabian >> >> Am Fr., 2. Nov. 2018 um 00:31 Uhr schrieb Bowen Li : >> >>> After taking a look at how other discussion threads work, I think it's >>> actually fine just keep our discussion here. It's up to you, Xuefu. >>> >>> The google doc LGTM. I left some minor comments. >>> >>> On Thu, Nov 1, 2018 at 10:17 AM Bowen Li wrote: >>> Hi all, As Xuefu has published the design doc on google, I agree with Shuyi's suggestion that we probably should start a new email thread like "[DISCUSS] ... Hive integration design ..." on only dev mailing list for community devs to review. The current thread sends to both dev and user list. This email thread is more like validating the general idea and direction with the community, and it's been pretty long and crowded so far. Since everyone is pro for the idea, we can move forward with another thread to discuss and finalize the design. Thanks, Bowen On Wed, Oct 31, 2018 at 12:16 PM Zhang, Xuefu wrote: > Hi Shuiyi, > > Good idea. Actually the PDF was converted from a google doc. Here is its > link: > > https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing > Once we reach an agreement, I can convert it to a FLIP. > > Thanks, > Xuefu > > > > -- > Sender:Shuyi Chen > Sent at:2018 Nov 1 (Thu) 02:47 > Recipient:Xuefu > Cc:vino yang ; Fabian Hueske ; > dev ; user > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem > > Hi Xuefu, > > Thanks a lot for driving this big effort. I would suggest convert your > proposal and design doc into a google doc, and share it on the dev mailing > list for the community to review and comment with title like "[DISCUSS] > ... > Hive integration design ..." . Once approved, we can document it as a > FLIP > (Flink Improvement Proposals), and use JIRAs to track the implementations. > What do you think? > > Shuyi
Memory size for network buffers
Hi, I'm trying to understand the intention behind the size parameters for network buffers, specifically max, min and fraction. The reason we are looking at it is an attempt to make the memory allocation elastic, so that memory is allocated according to the actual number of buffers required (within a range), without the need to tune this for every deployment. As of Flink 1.5, there are 3 parameters, but they all result in a fixed allocation, which is not what we were looking for. Here is an example just to illustrate it: taskmanager.network.memory.fraction: 0.01 taskmanager.network.memory.min: 32768 taskmanager.network.memory.max: 1073741824 taskmanager.memory.segment-size: 8192 I wanted fraction to be out of the picture (but 0 isn't an acceptable value). Then set min to something tiny that my job will exceed and max to something too large to reach. Unfortunately, that fails: java.io.IOException: Insufficient number of network buffers: required 8, but only 0 available. The total number of network buffers is currently set to 4 of 8192 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. So the question then is, why have min and max? Or is the intention to have a different implementation in the future? Thanks, Thomas
Re: Kinesis consumer e2e test
Hi, yes, that is correct. The failure mapper is there to cause a failover event for which we can then check i) that exactly-once or at-least-once is not violated, depending on the expected semantics and ii) that the restore works at all ;-). You might be able to reuse org.apache.flink.streaming.tests.FailureMapper for this. For the future, it would surely also be nice to have a test that covers rescaling as well, but for now just having any real test is already a great improvement. Best, Stefan > On 12. Nov 2018, at 05:23, Thomas Weise wrote: > > Hi Stefan, > > Thanks for the info. So if I understand correctly, the pipeline you had in > mind is: > > Consumer -> Map -> Producer > > What do you expect as outcome of the mapper failure? That no records are > lost but some possibly duplicated in the sink? > > Regarding the abstraction, I will see what I can do in that regard. From > where I start it may make more sense to do some of that as follow-up when > the Kafka test is ported. > > Thanks, > Thomas > > > On Thu, Nov 8, 2018 at 10:20 AM Stefan Richter > wrote: > >> Hi, >> >> I was also just planning to work on it before Stephan contacted Thomas to >> ask about this test. >> >> Thomas, you are right about the structure, the test should also go into >> the `run-nightly-tests.sh`. What I was planning to do is a simple job that >> consists of a Kinesis consumer, a mapper that fails once after n records, >> and a kinesis producer. I was hoping that creation, filling, and validation >> of the Kinesis topics can be done with the Java API, not by invoking >> commands in a bash script. In general I would try to minimise the amount of >> scripting and do as much in Java as possible. It would also be nice if the >> test was generalised, e.g. that abstract Producer/Consumer are created from >> a Supplier and also the validation is done over some abstraction that lets >> us iterate over the produced output. Ideally, this would be a test that we >> can reuse for all Consumer/Producer cases and we could also port the tests >> for Kafka to that. What do you think? >> >> Best, >> Stefan >> >>> On 8. Nov 2018, at 07:22, Tzu-Li (Gordon) Tai >> wrote: >>> >>> Hi Thomas, >>> >>> I think Stefan Richter is also working on the Kinesis end-to-end test, >> and >>> seems to be planning to implement it against a real Kinesis service >> instead >>> of Kinesalite. >>> Perhaps efforts should be synced here. >>> >>> Cheers, >>> Gordon >>> >>> >>> On Thu, Nov 8, 2018 at 1:38 PM Thomas Weise wrote: >>> Hi, I'm planning to add an end-to-end test for the Kinesis consumer. We have done something similar at Lyft, using Kinesalite, which can be run as Docker container. I see that some tests already make use of Docker, so we can assume it >> to be present in the target environment(s)? I also found the following ticket: https://issues.apache.org/jira/browse/FLINK-9007 It suggest to also cover the producer, which may be a good way to create the input data as well. The stream itself can be created with the >> Kinesis Java SDK. Following the existing layout, there would be a new module flink-end-to-end-tests/flink-kinesis-test Are there any suggestions or comments regarding this? Thanks, Thomas >> >>
Re: How to use RocksDBStateBackend predefined options
Sounds good. Perhaps it would also be good to allow the user to specify an options factory in flink-conf.yaml for more flexibility? Thomas On Mon, Nov 12, 2018 at 9:48 AM Stefan Richter wrote: > Hi, > > Ufuk is right, for historical reasons there is currently only the > programatic way but I think nothing speaks fundamentally against offering > configuration via config in the future (maybe just a lot of config keys > must be introduced to cover all options). > > Best, > Stefan > > > On 9. Nov 2018, at 22:52, Ufuk Celebi wrote: > > > > Hey Thomas, > > > > On Fri, Nov 9, 2018 at 6:07 PM Thomas Weise wrote: > >> Is there a way to activate the predefined options via configuration / > flink- > >> conf.yaml? Or only programmatically, like in [4]? The difficulty with > the > >> programmatic route (assuming this works now), is that in my case the > client > >> is Beam and I'm not writing the code that submits the job. > > > > AFAIK no. You can only do it programmatically at the moment [1]. > > Having the option to configure all settings through the configuration > > file seems to be a valid feature request to me. > > > > @Stefan Richter (cc'd): Are there any reasons that speak against > > exposing these options via the configuration file in your opinion? > > > > Best, > > > > Ufuk > > > > [1] Looking at the code in [2] and [3], the only options that are > > exposed via the configuration file are local directories and the timer > > service factory. > > [2] > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java > > [3] > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java > >
Re: How to use RocksDBStateBackend predefined options
Hi, Ufuk is right, for historical reasons there is currently only the programatic way but I think nothing speaks fundamentally against offering configuration via config in the future (maybe just a lot of config keys must be introduced to cover all options). Best, Stefan > On 9. Nov 2018, at 22:52, Ufuk Celebi wrote: > > Hey Thomas, > > On Fri, Nov 9, 2018 at 6:07 PM Thomas Weise wrote: >> Is there a way to activate the predefined options via configuration / flink- >> conf.yaml? Or only programmatically, like in [4]? The difficulty with the >> programmatic route (assuming this works now), is that in my case the client >> is Beam and I'm not writing the code that submits the job. > > AFAIK no. You can only do it programmatically at the moment [1]. > Having the option to configure all settings through the configuration > file seems to be a valid feature request to me. > > @Stefan Richter (cc'd): Are there any reasons that speak against > exposing these options via the configuration file in your opinion? > > Best, > > Ufuk > > [1] Looking at the code in [2] and [3], the only options that are > exposed via the configuration file are local directories and the timer > service factory. > [2] > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java > [3] > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
[jira] [Created] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
Till Rohrmann created FLINK-10855: - Summary: CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints Key: FLINK-10855 URL: https://issues.apache.org/jira/browse/FLINK-10855 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.6.2, 1.5.5, 1.7.0 Reporter: Till Rohrmann In case that an acknowledge checkpoint message is late or a checkpoint cannot be acknowledged, we discard the subtask state in the {{CheckpointCoordinator}}. What's not happening in this case is that we delete the parent directory of the checkpoint. This only happens when we dispose a {{PendingCheckpoint#dispose}}. Due to this behaviour it can happen that a checkpoint fails (e.g. a task not being ready) and we delete the checkpoint directory. Next another task writes its checkpoint data to the checkpoint directory (thereby creating it again) and sending an acknowledge message back to the {{CheckpointCoordinator}}. The {{CheckpointCoordinator}} will realize that there is no longer a {{PendingCheckpoint}} and will discard the sub task state. This will remove the state files from the checkpoint directory but will leave the checkpoint directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10856) Harden resume from externalized checkpoint E2E test
Till Rohrmann created FLINK-10856: - Summary: Harden resume from externalized checkpoint E2E test Key: FLINK-10856 URL: https://issues.apache.org/jira/browse/FLINK-10856 Project: Flink Issue Type: Bug Components: E2E Tests, State Backends, Checkpointing Affects Versions: 1.6.2, 1.5.5, 1.7.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.6, 1.6.3, 1.7.0 The resume from externalized checkpoints E2E test can fail due to FLINK-10855. We should harden the test script to not expect a single checkpoint directory being present but to take the checkpoint with the highest checkpoint counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10853) Provide end-to-end test for Kafka 0.9 connector
vinoyang created FLINK-10853: Summary: Provide end-to-end test for Kafka 0.9 connector Key: FLINK-10853 URL: https://issues.apache.org/jira/browse/FLINK-10853 Project: Flink Issue Type: Test Components: E2E Tests Reporter: vinoyang Assignee: vinoyang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10854) Provide end-to-end test for Kafka 0.8 connector
vinoyang created FLINK-10854: Summary: Provide end-to-end test for Kafka 0.8 connector Key: FLINK-10854 URL: https://issues.apache.org/jira/browse/FLINK-10854 Project: Flink Issue Type: Test Components: E2E Tests Reporter: vinoyang Assignee: vinoyang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
REST job submission
Hi to all, in our ETL we need to call an external (REST) service once a job ends: we extract informations about accumulators and we update the job status. However this is only possible if using the CLI client: if we call the job via the REST API o Web UI (that is very useful to decouple our UI from the Flink cluster) then this is not possible, because the REST API cannot execute any code after env.execute(). I think that this is a very huge limitation: first of all, when writing (and debugging) a Flink job, you assume that you can call multiple times execute() and use the returned JobExecutionResult. In second instance, the binary client and the rest client behaves differently (with the CLI client everything works as expected). What do you think about this? Is this a bug or not? PS: I think also that the REST client should not be aware of any jar or class instance, it should just call the job manager with the proper class name and jar id (plus other options of course). Cheers, Flavio
[jira] [Created] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management
ouyangzhe created FLINK-10852: - Summary: Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management Key: FLINK-10852 URL: https://issues.apache.org/jira/browse/FLINK-10852 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.7.0 Reporter: ouyangzhe Fix For: 1.8.0 {panel:title=Jobs using DataSet iteration operator, if set jobmanager.execution.failover-strategy: region, will hang on FAILING state when failover and has the following exception.} java.lang.IllegalStateException: Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management. at org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103) at org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707) at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939) at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {panel} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10851) sqlUpdate support complex insert grammar
frank wang created FLINK-10851: -- Summary: sqlUpdate support complex insert grammar Key: FLINK-10851 URL: https://issues.apache.org/jira/browse/FLINK-10851 Project: Flink Issue Type: Bug Reporter: frank wang my code is {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}} but flink give me error info, said kafka "No table was registered under the name kafka" i modify the code ,that is ok now TableEnvironment.scala {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} should modify to this {code:java} def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) val targetTableName = insert.getTargetTable.toString // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } } {code} i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10850) Job may hang on FAILING state if taskmanager updateTaskExecutionState failed
ouyangzhe created FLINK-10850: - Summary: Job may hang on FAILING state if taskmanager updateTaskExecutionState failed Key: FLINK-10850 URL: https://issues.apache.org/jira/browse/FLINK-10850 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.5.5 Reporter: ouyangzhe Fix For: 1.8.0 I encountered a job which is oom but hung on FAILING state. It left 3 slots to release, and the corresponding task state is CANCELING. I found the following log in the taskmanager, it seems that taskmanager tried to updateTaskExecutionState from CANCELING to CANCELED, but OOMed. {panel} 2018-11-08 18:01:23,250 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) (46005ba837e fc4ebf783fc92121e55a8) switched from RUNNING to CANCELING. 2018-11-08 18:01:23,257 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code PartialSolution (BulkIteration (B ulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8). 2018-11-08 18:01:44,081 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) (46005ba837e fc4ebf783fc92121e55a8) switched from CANCELING to CANCELED. 2018-11-08 18:01:44,081 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for PartialSolution (BulkIteration (Bulk Iterat ion)) (97/600) (46005ba837efc4ebf783fc92121e55a8). 2018-11-08 18:02:03,097 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' did n ot react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.shaded.guava18.com.google.common.collect.Maps$EntryFunction$1.apply(Maps.java:86) org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$8.transform(Iterators.java:799) org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) java.util.AbstractCollection.toArray(AbstractCollection.java:141) org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:258) org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:100) org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:275) org.apache.flink.runtime.taskmanager.Task.run(Task.java:833) java.lang.Thread.run(Thread.java:745) 2018-11-08 18:02:05,665 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution e9141e20871e530dee90 4ddce11adca0. 2018-11-08 18:02:22,536 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution 7fac76a5d76247d803e1 f1c47a6b385f. 2018-11-08 18:03:47,210 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' did n ot react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:497) org.apache.flink.runtime.taskmanager.Task.run(Task.java:837) java.lang.Thread.run(Thread.java:745) 2018-11-08 18:03:47,213 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task PartialSolution (B ulkIteration (Bulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8) [CANCELED] 2018-11-08 18:03:47,215 WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline - An exception was thrown by a user handler while handlin g an exception event ([id: 0x397132f7, /11.10.199.197:33286 => /11.9.137.228:40859] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68) at org.apache.flink.shaded.akka.org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48) at org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.FrameDecoder.extractFrame(FrameDecoder.java:566) at