[jira] [Commented] (FLINK-12297) Make ClosureCleaner recursive
[ https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864624#comment-16864624 ] Aljoscha Krettek commented on FLINK-12297: -- Merge on release-1.8 in 159c52769045853bc95a0efaae8ab2c7675e1d42 > Make ClosureCleaner recursive > - > > Key: FLINK-12297 > URL: https://issues.apache.org/jira/browse/FLINK-12297 > Project: Flink > Issue Type: Bug > Components: API / DataSet, API / DataStream >Affects Versions: 1.8.0 >Reporter: Dawid Wysakowicz >Assignee: Aitozi >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 1h > Remaining Estimate: 0h > > Right now we do not invoke closure cleaner on output tags. Therefore such > code: > {code} > @Test > public void testFlatSelectSerialization() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource elements = env.fromElements(1, 2, 3); > OutputTag outputTag = new OutputTag("AAA") {}; > CEP.pattern(elements, Pattern.begin("A")).flatSelect( > outputTag, > new PatternFlatTimeoutFunction() { > @Override > public void timeout( > Map> pattern, > long timeoutTimestamp, > Collector out) throws > Exception { > } > }, > new PatternFlatSelectFunction() { > @Override > public void flatSelect(Map List> pattern, Collector out) throws Exception { > } > } > ); > env.execute(); > } > {code} > will fail with {{The implementation of the PatternFlatSelectAdapter is not > serializable. }} exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12297) Make ClosureCleaner recursive
[ https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-12297. Resolution: Fixed > Make ClosureCleaner recursive > - > > Key: FLINK-12297 > URL: https://issues.apache.org/jira/browse/FLINK-12297 > Project: Flink > Issue Type: Bug > Components: API / DataSet, API / DataStream >Affects Versions: 1.8.0 >Reporter: Dawid Wysakowicz >Assignee: Aitozi >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 1h > Remaining Estimate: 0h > > Right now we do not invoke closure cleaner on output tags. Therefore such > code: > {code} > @Test > public void testFlatSelectSerialization() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource elements = env.fromElements(1, 2, 3); > OutputTag outputTag = new OutputTag("AAA") {}; > CEP.pattern(elements, Pattern.begin("A")).flatSelect( > outputTag, > new PatternFlatTimeoutFunction() { > @Override > public void timeout( > Map> pattern, > long timeoutTimestamp, > Collector out) throws > Exception { > } > }, > new PatternFlatSelectFunction() { > @Override > public void flatSelect(Map List> pattern, Collector out) throws Exception { > } > } > ); > env.execute(); > } > {code} > will fail with {{The implementation of the PatternFlatSelectAdapter is not > serializable. }} exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha closed pull request #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
aljoscha closed pull request #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
aljoscha commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-502340886 I merged this! Thanks again to the two of you for working on this. 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12297) Make ClosureCleaner recursive
[ https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864617#comment-16864617 ] Aljoscha Krettek commented on FLINK-12297: -- Merged on master in 68cc21e4af71505efa142110e35a1f8b1c25fe6e > Make ClosureCleaner recursive > - > > Key: FLINK-12297 > URL: https://issues.apache.org/jira/browse/FLINK-12297 > Project: Flink > Issue Type: Bug > Components: API / DataSet, API / DataStream >Affects Versions: 1.8.0 >Reporter: Dawid Wysakowicz >Assignee: Aitozi >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 50m > Remaining Estimate: 0h > > Right now we do not invoke closure cleaner on output tags. Therefore such > code: > {code} > @Test > public void testFlatSelectSerialization() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource elements = env.fromElements(1, 2, 3); > OutputTag outputTag = new OutputTag("AAA") {}; > CEP.pattern(elements, Pattern.begin("A")).flatSelect( > outputTag, > new PatternFlatTimeoutFunction() { > @Override > public void timeout( > Map> pattern, > long timeoutTimestamp, > Collector out) throws > Exception { > } > }, > new PatternFlatSelectFunction() { > @Override > public void flatSelect(Map List> pattern, Collector out) throws Exception { > } > } > ); > env.execute(); > } > {code} > will fail with {{The implementation of the PatternFlatSelectAdapter is not > serializable. }} exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha closed pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
aljoscha closed pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8744 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12855) Stagger panes on partitions to distribute workload.
Teng Hu created FLINK-12855: --- Summary: Stagger panes on partitions to distribute workload. Key: FLINK-12855 URL: https://issues.apache.org/jira/browse/FLINK-12855 Project: Flink Issue Type: New Feature Components: API / DataStream Reporter: Teng Hu Attachments: stagger_window.png, stagger_window_delay.png, stagger_window_throughput.png Flink natively triggers all panes belonging to same window at the same time. In other words, all panes are aligned and their triggers all fire simultaneously, causing the thundering herd effect. This new feature provides the option that panes could be staggered across partitioned streams, so that their workloads are distributed. Attachment: proof of concept working -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12855) Stagger panes on partitions to distribute workload.
[ https://issues.apache.org/jira/browse/FLINK-12855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Teng Hu updated FLINK-12855: Attachment: (was: stagger_window_delay.png) > Stagger panes on partitions to distribute workload. > --- > > Key: FLINK-12855 > URL: https://issues.apache.org/jira/browse/FLINK-12855 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Teng Hu >Priority: Major > Attachments: stagger_window.png, stagger_window_delay.png, > stagger_window_throughput.png > > > Flink natively triggers all panes belonging to same window at the same time. > In other words, all panes are aligned and their triggers all fire > simultaneously, causing the thundering herd effect. > This new feature provides the option that panes could be staggered across > partitioned streams, so that their workloads are distributed. > Attachment: proof of concept working -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12855) Stagger panes on partitions to distribute workload.
[ https://issues.apache.org/jira/browse/FLINK-12855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Teng Hu updated FLINK-12855: Attachment: stagger_window.png > Stagger panes on partitions to distribute workload. > --- > > Key: FLINK-12855 > URL: https://issues.apache.org/jira/browse/FLINK-12855 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Teng Hu >Priority: Major > Attachments: stagger_window.png, stagger_window_delay.png, > stagger_window_throughput.png > > > Flink natively triggers all panes belonging to same window at the same time. > In other words, all panes are aligned and their triggers all fire > simultaneously, causing the thundering herd effect. > This new feature provides the option that panes could be staggered across > partitioned streams, so that their workloads are distributed. > Attachment: proof of concept working -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#discussion_r294030972 ## File path: flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java ## @@ -96,33 +96,36 @@ public void testMetricRegistration() throws Exception { @Test public void testMetricReporting() throws Exception { - MetricRegistryImpl metricRegistry = createMetricRegistry(); + String retentionPolicy = "one_hour"; + MetricRegistryImpl metricRegistry = createMetricRegistry(retentionPolicy); try { String metricName = "TestCounter"; Counter counter = registerTestMetric(metricName, metricRegistry); counter.inc(42); stubFor(post(urlPathEqualTo("/write")) - .willReturn(aResponse() - .withStatus(200))); + .willReturn(aResponse() + .withStatus(200))); Review comment: @1u0, @Myasuka - Please let me know if I can resolve this issue and if there is anything else required for this PR. Thanks for your advice. Mans This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangkuiyi opened a new pull request #7015: Add the -p option to nc
wangkuiyi opened a new pull request #7015: Add the -p option to nc URL: https://github.com/apache/flink/pull/7015 ## Purpose When I run the [quick start](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) tutorial in Docker containers (`docker run flink`), the `example/streaming/SocketWindowCount.jar` job would fail if it listens on a netcat process started with `nc -l 9000`. This tutorial works if I change the command into `nc -l -p 9000`. ## Verifying To reproduce the failure and fix: 1. In a terminal, start the container `docker run --rm -it --name my_flink flink bash`. 1. In the container, install and run netcat `apt-get update && apt-get install -y netcat && nc -l 9000`. 1. In another terminal, attaches to the container `docker exec -it my_flink bash` 1. In the attached terminal session, start the cluster `start-cluster.sh` and run the example `flink run example/streaming/SocketWindowWordCount.jar --port 9000`. The example would fail with the following errors. The failure could be avoided if we redo the above step but with `nc -l -p 9000`. ``` root@493ef04c41ee:/opt/flink# flink run examples/streaming/SocketWindowWordCount.jar --port 9000 Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: b58996c431847f40852dad17f0d21244) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 17 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) ``` ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for
[GitHub] [flink] wangkuiyi commented on issue #7015: Add the -p option to nc
wangkuiyi commented on issue #7015: Add the -p option to nc URL: https://github.com/apache/flink/pull/7015#issuecomment-502324878 Thanks for following up @greghogan and @zentol . I am closing this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangkuiyi closed pull request #7015: Add the -p option to nc
wangkuiyi closed pull request #7015: Add the -p option to nc URL: https://github.com/apache/flink/pull/7015 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangkuiyi closed pull request #7015: Add the -p option to nc
wangkuiyi closed pull request #7015: Add the -p option to nc URL: https://github.com/apache/flink/pull/7015 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r294029650 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala ## @@ -116,11 +116,16 @@ class StreamExecCalc( retainHeader = true, "StreamExecCalc" ) -new OneInputTransformation( +val ret = new OneInputTransformation( inputTransform, RelExplainUtil.calcToString(calcProgram, getExpressionString), substituteStreamOperator, BaseRowTypeInfo.of(outputType), - inputTransform.getParallelism) + getResource.getParallelism) + +if (getResource.getMaxParallelism > 0) { + ret.setMaxParallelism(getResource.getMaxParallelism) Review comment: It just transfers the result to StreamTransformation from parallelism calculating which decide whether to set max parallelism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r294029548 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetter.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.resource.parallelism; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.table.plan.nodes.exec.ExecNode; +import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan; +import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan; +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan; +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan; + +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Values; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Set final parallelism if needed at the beginning time, if parallelism of a node is set to be final, + * it will not be changed by other parallelism calculator. + */ +public class FinalParallelismSetter { + + private final StreamExecutionEnvironment env; + private Set> calculatedNodeSet = new HashSet<>(); + private Map, Integer> finalParallelismNodeMap = new HashMap<>(); + + private FinalParallelismSetter(StreamExecutionEnvironment env) { + this.env = env; + } + + /** +* Finding nodes that need to set final parallelism. +*/ + public static Map, Integer> calculate(StreamExecutionEnvironment env, List> sinkNodes) { + FinalParallelismSetter setter = new FinalParallelismSetter(env); + sinkNodes.forEach(setter::calculate); + return setter.finalParallelismNodeMap; + } + + private void calculate(ExecNode execNode) { + if (!calculatedNodeSet.add(execNode)) { + return; + } + if (execNode instanceof BatchExecTableSourceScan) { + calculateTableSource((BatchExecTableSourceScan) execNode); + } else if (execNode instanceof StreamExecTableSourceScan) { + calculateTableSource((StreamExecTableSourceScan) execNode); + } else if (execNode instanceof BatchExecBoundedStreamScan) { + calculateBoundedStreamScan((BatchExecBoundedStreamScan) execNode); + } else if (execNode instanceof StreamExecDataStreamScan) { + calculateDataStreamScan((StreamExecDataStreamScan) execNode); + } else if (execNode instanceof Values) { + calculateValues(execNode); + } else { + calculateIfSingleton(execNode); + } + } + + private void calculateTableSource(BatchExecTableSourceScan tableSourceScan) { + StreamTransformation transformation = tableSourceScan.getSourceTransformation(env); + if (transformation.getMaxParallelism() > 0) { + tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism()); Review comment: Calculating parallelism of shuffleStage later in ShuffleStageParallelismCalculator considers the max parallelism of exec nodes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF
bowenli86 commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF URL: https://github.com/apache/flink/pull/8700#issuecomment-502312751 I created FLINK-12854 to evaluate if we should support primitive types and convert them for Hive functions. This is not critical and thus may or may not happen in 1.9. @JingsongLi Can you take another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12854) evaluate and support for primitive array in Hive functions
Bowen Li created FLINK-12854: Summary: evaluate and support for primitive array in Hive functions Key: FLINK-12854 URL: https://issues.apache.org/jira/browse/FLINK-12854 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Flink right now support primitive arrays, and Hive functions don't. we need to evaluate if we should support primitive arrays by converting primitive arrays to list or array of boxed types, and then pass to Hive function. If it's positive, we should implement this feature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12705) Allow user to specify the Hive version in use
[ https://issues.apache.org/jira/browse/FLINK-12705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12705. Resolution: Fixed merged in 1.9.0: ec383d6c737fa0bc24ec6984bbc70207f8da9c53 > Allow user to specify the Hive version in use > - > > Key: FLINK-12705 > URL: https://issues.apache.org/jira/browse/FLINK-12705 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Rui Li >Assignee: Rui Li >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Follow up of FLINK-12649 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8694: [FLINK-12705][hive] Allow user to specify the Hive version in use
asfgit closed pull request #8694: [FLINK-12705][hive] Allow user to specify the Hive version in use URL: https://github.com/apache/flink/pull/8694 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog
bowenli86 commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog URL: https://github.com/apache/flink/pull/8720#issuecomment-502237063 @xuefuz good idea to add tests for mixed operations, I'll add them. As I can think of for now, there should be no impact on db operations except dropping database needs to ensure it also doesn't have temp tables. Will add that too. I'll hold on this PR a little bit util we reach consensus with Dawid This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors
Myasuka commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors URL: https://github.com/apache/flink/pull/8745#issuecomment-502235022 @flinkbot attention @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors
flinkbot commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors URL: https://github.com/apache/flink/pull/8745#issuecomment-502234862 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka opened a new pull request #8745: Task not fail checkpoint
Myasuka opened a new pull request #8745: Task not fail checkpoint URL: https://github.com/apache/flink/pull/8745 ## What is the purpose of the change FLINK-11662 has been a known and urgent issue after Flink-1.8 release. There exist two parts to fix this problem, one is from task side to disable task to fail on checkpoint errors. Another part is to let JM to decide when to fail the whole job which has been part of work in https://github.com/apache/flink/pull/8322. This PR mainly focus on the task side and choose to base on https://github.com/apache/flink/pull/8322 before it merged since that is very unlikely to still have significant changes at this point. ## Brief change log - Deprecate `isFailTaskOnCheckpointError` and `setFailTaskOnCheckpointError` in `ExecutionConfig`. - Deprecate `isFailOnCheckpointingErrors` and `setFailOnCheckpointingErrors` in `CheckpointConfig`. - Remove `FailingCheckpointExceptionHandler` ## Verifying this change This change added tests and can be verified as follows: - Refactor `CheckpointExceptionHandlerConfigurationTest` and `CheckpointExceptionHandlerTest` to verify the default logic changed. - Refactor `StreamTaskTest` to verify the default logic changed on task side. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293931882 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1083,8 +1136,14 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio } @Override - public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, + CatalogException, TableNotPartitionedException { + Table hiveTable = getHiveTable(tablePath); + if (!isTablePartitioned(hiveTable)) { + return createCatalogTableStatistics(hiveTable.getParameters()); + } else { + throw new TableNotPartitionedException(getName(), tablePath); Review comment: First, shouldn't this be a `TablePartitionedException`? Second, please correct me if I'm wrong, IIRC, if the table is partitioned, wouldn't Hive client return unknown stats? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293934696 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java ## @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws Exception { assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size()); } + + // -- table and column stats -- + + @Test + public void testGetTableStats_TableNotExistException() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + exception.expect(org.apache.flink.table.api.TableNotExistException.class); + catalog.getTableStatistics(path1); + } + + @Test + public void testGetPartitionStats() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); + // there're essentially no stats, so nothing to assert + catalog.getPartitionStatistics(path1, createPartitionSpec()); Review comment: shouldn't we just assert every stats is the default value which is `0`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293934522 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1060,21 +1061,73 @@ private static Function instantiateHiveFunction(ObjectPath functionPath, HiveCat ); } + private boolean isTablePartitioned(Table hiveTable) { + return hiveTable.getPartitionKeysSize() != 0; + } + // -- stats -- @Override public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - + try { + Table hiveTable = getHiveTable(tablePath); + // Set table stats + if (needToUpdateStatistics(tableStatistics, hiveTable.getParameters())) { + updateStatisticsParameters(tableStatistics, hiveTable.getParameters()); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable); + } + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter table stats of table %s", tablePath.getFullName()), e); + } } @Override public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { } + private static boolean needToUpdateStatistics(CatalogTableStatistics statistics, Map oldParameters) { + String oldRowCount = oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0"); + String oldTotalSize = oldParameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0"); + String oldNumFiles = oldParameters.getOrDefault(StatsSetupConst.NUM_FILES, "0"); + String oldRawDataSize = oldParameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0"); + return statistics.getRowCount() != Long.parseLong(oldRowCount) || statistics.getTotalSize() != Long.parseLong(oldTotalSize) + || statistics.getFileCount() != Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != Long.parseLong(oldRawDataSize); + } + + private static void updateStatisticsParameters(CatalogTableStatistics tableStatistics, Map parameters) { + parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(tableStatistics.getRowCount())); + parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(tableStatistics.getTotalSize())); + parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(tableStatistics.getFileCount())); + parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(tableStatistics.getRawDataSize())); + } + + private static CatalogTableStatistics createCatalogTableStatistics(Map parameters) { + long rowRount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0")); Review comment: should reuse the `0`s in `HiveStatsUtil` as mentioned above to be consistent This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293932672 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java ## @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws Exception { assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size()); } + + // -- table and column stats -- + + @Test + public void testGetTableStats_TableNotExistException() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + exception.expect(org.apache.flink.table.api.TableNotExistException.class); Review comment: the `TableNotExistException` is not the class we want. We should use `org.apache.flink.table.catalog.exceptions.TableNotExistException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293934316 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1060,21 +1061,73 @@ private static Function instantiateHiveFunction(ObjectPath functionPath, HiveCat ); } + private boolean isTablePartitioned(Table hiveTable) { + return hiveTable.getPartitionKeysSize() != 0; + } + // -- stats -- @Override public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - + try { + Table hiveTable = getHiveTable(tablePath); + // Set table stats + if (needToUpdateStatistics(tableStatistics, hiveTable.getParameters())) { + updateStatisticsParameters(tableStatistics, hiveTable.getParameters()); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable); + } + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter table stats of table %s", tablePath.getFullName()), e); + } } @Override public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { } + private static boolean needToUpdateStatistics(CatalogTableStatistics statistics, Map oldParameters) { + String oldRowCount = oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0"); Review comment: These are basically setting default values for all types of Hive table stats. How about moving the `0`s to `HiveStatsUtil` and define them as `public static final` vars to make it easier to find and access across hive catalog code base? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293932104 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1094,7 +1153,15 @@ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) th @Override public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Partition partition = getHivePartition(tablePath, partitionSpec); + return createCatalogTableStatistics(partition.getParameters()); + } catch (TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e); + } catch (TException e) { + throw new CatalogException(String.format("Failed to get table stats of table %s 's partition %s", Review comment: ```suggestion throw new CatalogException(String.format("Failed to get partition stats of table %s 's partition %s", ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293935284 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java ## @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws Exception { assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size()); } + + // -- table and column stats -- + + @Test + public void testGetTableStats_TableNotExistException() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + exception.expect(org.apache.flink.table.api.TableNotExistException.class); + catalog.getTableStatistics(path1); + } + + @Test + public void testGetPartitionStats() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); + // there're essentially no stats, so nothing to assert + catalog.getPartitionStatistics(path1, createPartitionSpec()); + } + + @Test + public void testAlterTableStats() throws Exception{ + // Non-partitioned table + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + CatalogTableStatistics tableStats = new CatalogTableStatistics(100, 10, 1000, 1); + catalog.alterTableStatistics(path1, tableStats, false); + CatalogTableStatistics actual = catalog.getTableStatistics(path1); + + assertEquals(tableStats.toString(), actual.toString()); + } + + @Test + public void testAlterTableStats_partitionedTable() throws Exception { + // alterTableStats() should do nothing for partitioned tables + // getTableStats() should return empty column stats for partitioned tables + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + + CatalogTableStatistics stats = new CatalogTableStatistics(100, 1, 1000, 1); + Review comment: > // alterTableStats() should do nothing for partitioned tables // getTableStats() should return empty column stats for partitioned tables as the comment describes, we should also assert `assertEquals(CatalogTableStatistics.UNKNOWN, catalog.getTableStatistics(path1));` before the altering This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r293932284 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1060,21 +1061,73 @@ private static Function instantiateHiveFunction(ObjectPath functionPath, HiveCat ); } + private boolean isTablePartitioned(Table hiveTable) { + return hiveTable.getPartitionKeysSize() != 0; + } + // -- stats -- @Override public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - + try { + Table hiveTable = getHiveTable(tablePath); + // Set table stats + if (needToUpdateStatistics(tableStatistics, hiveTable.getParameters())) { + updateStatisticsParameters(tableStatistics, hiveTable.getParameters()); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable); + } + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter table stats of table %s", tablePath.getFullName()), e); + } } @Override public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { } + private static boolean needToUpdateStatistics(CatalogTableStatistics statistics, Map oldParameters) { + String oldRowCount = oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0"); + String oldTotalSize = oldParameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0"); + String oldNumFiles = oldParameters.getOrDefault(StatsSetupConst.NUM_FILES, "0"); + String oldRawDataSize = oldParameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0"); + return statistics.getRowCount() != Long.parseLong(oldRowCount) || statistics.getTotalSize() != Long.parseLong(oldTotalSize) + || statistics.getFileCount() != Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != Long.parseLong(oldRawDataSize); + } + + private static void updateStatisticsParameters(CatalogTableStatistics tableStatistics, Map parameters) { + parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(tableStatistics.getRowCount())); + parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(tableStatistics.getTotalSize())); + parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(tableStatistics.getFileCount())); + parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(tableStatistics.getRawDataSize())); + } + + private static CatalogTableStatistics createCatalogTableStatistics(Map parameters) { + long rowRount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0")); + long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0")); + int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, "0")); + long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0")); + return new CatalogTableStatistics(rowRount, numFiles, totalSize, rawDataSize); + } + @Override public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - + try { + Partition hivePartition = getHivePartition(tablePath, partitionSpec); + // Set table stats + if (needToUpdateStatistics(partitionStatistics, hivePartition.getParameters())) { + updateStatisticsParameters(partitionStatistics, hivePartition.getParameters()); + client.alter_partition(tablePath.getDatabaseName(), tablePath.getObjectName(), hivePartition); + } + } catch (TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e); + } catch (TException e) { +
[jira] [Comment Edited] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864356#comment-16864356 ] Xuefu Zhang edited comment on FLINK-12771 at 6/14/19 6:49 PM: -- Hi [~dawidwys], Re: #2, communicating with Hive metastore is the main reason for having HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a user can create temporary tables/functions via DDL, which requires no communication to Hive metastore. Such session specific objects are stored in user session (or client). (In fact, Hive metastore provided a type of client that has this functionality built-in.) Those objects share the same namespace with the persistent objects stored in Hive metastore. I'd argue that Flink's temp objects such as inline tables have the same nature, and can be handled in a similar way. I agree this isn't the only solution. Storing them in a dedicated, in memory catalog is one possibility, and having a session-specific structure holding them is another. We feel extending the capability of HiveCatalog to share and store them having greater advantages. Re: #3, persistence is just one part of the store, but we have decoupled the types of the object to be stored and the nature of the catalog. This happened when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a catalog may store any type of tables. The work here is just an extension to that. Along the same idea, we shouldn't stop a user to define either a Hive table, a generic table, or an inline table in an in-memory catalog. If a hive table is correctly define in in-memory catalog, Flink has no problem read/write that table. The difference is that the table definition doesn't survive beyond user session. In summary, we decoupled the object types with the types of the catalog. I understand there might be a perception that anything registered in HiveCatalog should be persisted. We know this isn't true for Hive's temporary tables and functions. I think we just need to educate user that certain tables (such as inline tables) are temporary in nature and valid only in current session, which was already true before HiveCatalog is introduced. Please let me know if there are more questions or comments. P.S. just noticed that Bowen had responded, but I guess it's okay if there is some duplicated points. was (Author: xuefuz): Hi [~dawidwys], Re: #2, communicating with Hive metastore is the main reason for having HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a user can create temporary tables/functions via DDL, which requires no communication to Hive metastore. Such session specific objects are stored in user session (or client). (In fact, Hive metastore provided a type of client that has this functionality built-in.) Those objects share the same namespace with the persistent objects stored in Hive metastore. I'd argue that Flink's temp objects such as inline tables have the same nature, and can be handled in a similar way. I agree this isn't the only solution. Storing them in a dedicated, in memory catalog is one possibility, and having a session-specific structure holding them is another. We feel extending the capability of HiveCatalog to share and store them having greater advantages. Re: #3, persistence is just one part of the store, but we have decoupled the types of the object to be stored and the nature of the catalog. This happened when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a catalog may store any type of tables. The work here is just an extension to that. Along the same idea, we shouldn't stop a user to define either a Hive table, a generic table, or an inline table in an in-memory catalog. If a hive catalog is correctly define in in-memory catalog, Flink has no problem read/write that table. The difference is that the table definition doesn't survive beyond user session. I understand there might be a perception that anything registered in HiveCatalog should be persisted. We know this isn't true for Hive's temporary tables and functions. I think we just need to educate user that certain tables (such as inline tables) are temporary in nature and valid only in current session, which was already true before HiveCatalog is introduced. Please let me know if there are more questions or comments. > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time S
[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864356#comment-16864356 ] Xuefu Zhang commented on FLINK-12771: - Hi [~dawidwys], Re: #2, communicating with Hive metastore is the main reason for having HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a user can create temporary tables/functions via DDL, which requires no communication to Hive metastore. Such session specific objects are stored in user session (or client). (In fact, Hive metastore provided a type of client that has this functionality built-in.) Those objects share the same namespace with the persistent objects stored in Hive metastore. I'd argue that Flink's temp objects such as inline tables have the same nature, and can be handled in a similar way. I agree this isn't the only solution. Storing them in a dedicated, in memory catalog is one possibility, and having a session-specific structure holding them is another. We feel extending the capability of HiveCatalog to share and store them having greater advantages. Re: #3, persistence is just one part of the store, but we have decoupled the types of the object to be stored and the nature of the catalog. This happened when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a catalog may store any type of tables. The work here is just an extension to that. Along the same idea, we shouldn't stop a user to define either a Hive table, a generic table, or an inline table in an in-memory catalog. If a hive catalog is correctly define in in-memory catalog, Flink has no problem read/write that table. The difference is that the table definition doesn't survive beyond user session. I understand there might be a perception that anything registered in HiveCatalog should be persisted. We know this isn't true for Hive's temporary tables and functions. I think we just need to educate user that certain tables (such as inline tables) are temporary in nature and valid only in current session, which was already true before HiveCatalog is introduced. Please let me know if there are more questions or comments. > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's > a major drawback on this when it comes to real use cases, that is when Table > API users set a {{HiveCatalog}} as their default catalog (which is very > likely), they cannot create or use any inline table sources/sinks with their > default catalog any more. It's really inconvenient for Table API users to use > Flink for exploration, experiment, and production. > There are several workaround in this case. E.g. users have to switch their > default catalog, but that misses our original intention of having a default > {{HiveCatalog}}; or users can register their inline source/sinks to Flink's > default catalog which is a in memory catalog, but that not only require users > to type full path of a table but also requires users to be aware of the > Flink's default catalog, default db, and their names. In short, none of the > workaround seems to be reasonable and user friendly. > From another perspective, Hive has the concept of temporary tables that are > stored in memory of Hive metastore client and are removed when client is shut > down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of > session-based temporary table, and {{HiveCatalog}} (potentially any catalog > implementations) can store it in memory. By introducing the concept of temp > table, we could greatly eliminate frictions for users and raise their > experience and productivity. > Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} > in {{HiveCatalog}} to allow users create and use inline source/sink when > their default catalog is a {{HiveCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8703#discussion_r293925009 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1074,15 +1074,14 @@ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics ta } @Override - public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TableNotPartitionedException { try { Table hiveTable = getHiveTable(tablePath); // Set table column stats. This only works for non-partitioned tables. if (!isTablePartitioned(hiveTable)) { - client.updateTableColumnStatistics(HiveCatalogUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData())); + client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData())); } else { - throw new CatalogException(String.format("Failed to alter partition table column stats of table %s", - tablePath.getFullName())); + throw new TableNotPartitionedException(getName(), tablePath); Review comment: shouldn't this be a `TablePartitionedException`? not `TableNotPartitionedException` we need to add a `TablePartitionedException.class` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8703#discussion_r293926968 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -1116,11 +1117,14 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio private String getPartitionName(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, Table hiveTable) throws PartitionSpecInvalidException { List partitionCols = getFieldNames(hiveTable.getPartitionKeys()); List partitionVals = getOrderedFullPartitionValues(partitionSpec, partitionCols, tablePath); - List partKVs = new ArrayList<>(); + StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < partitionCols.size(); i++) { - partKVs.add(partitionCols.get(i) + "=" + partitionVals.get(i)); + stringBuilder.append(partitionCols.get(i)).append("=").append(partitionVals.get(i)); + if (i < partitionCols.size() - 1) { + stringBuilder.append("/"); + } } - return org.apache.commons.lang3.StringUtils.join(partKVs, "/"); Review comment: we can keep the `List partKVs` and just call `return String.join("/", partKVs)` at the end This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog
bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8703#discussion_r293927620 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java ## @@ -162,38 +148,44 @@ private static CatalogColumnStatisticsDataBase createTableColumnStats(DataType c } else if (stats.isSetDoubleStats()) { DoubleColumnStatsData doubleStats = stats.getDoubleStats(); return new CatalogColumnStatisticsDataDouble( - doubleStats.getLowValue(), doubleStats.getHighValue(), - doubleStats.getNumDVs(), doubleStats.getNumNulls()); + doubleStats.getLowValue(), + doubleStats.getHighValue(), + doubleStats.getNumDVs(), + doubleStats.getNumNulls()); } else if (stats.isSetLongStats()) { LongColumnStatsData longColStats = stats.getLongStats(); return new CatalogColumnStatisticsDataLong( - longColStats.getLowValue(), longColStats.getHighValue(), - longColStats.getNumDVs(), longColStats.getNumNulls()); + longColStats.getLowValue(), + longColStats.getHighValue(), + longColStats.getNumDVs(), + longColStats.getNumNulls()); } else if (stats.isSetStringStats()) { StringColumnStatsData stringStats = stats.getStringStats(); return new CatalogColumnStatisticsDataString( - stringStats.getMaxColLen(), stringStats.getAvgColLen(), - stringStats.getNumDVs(), stringStats.getNumNulls()); + stringStats.getMaxColLen(), + stringStats.getAvgColLen(), + stringStats.getNumDVs(), + stringStats.getNumNulls()); } else { LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType); return null; } } - /** * Convert Flink ColumnStats to Hive ColumnStatisticsData according to Hive column type. * Note we currently assume that, in Flink, the max and min of ColumnStats will be same type as the Flink column type. * For example, for SHORT and Long columns, the max and min of their ColumnStats should be of type SHORT and LONG. */ private static ColumnStatisticsData getColumnStatisticsData(DataType colType, CatalogColumnStatisticsDataBase colStat) { - LogicalType colLogicalType = colType.getLogicalType(); - if (colLogicalType instanceof CharType || colLogicalType instanceof VarCharType) { + LogicalTypeRoot type = colType.getLogicalType().getTypeRoot(); + if (type.equals(LogicalTypeRoot.CHAR) + || type.equals(LogicalTypeRoot.VARCHAR)) { Review comment: nit: one extra tab here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12836) Allow retained checkpoints to be persisted on success
[ https://issues.apache.org/jira/browse/FLINK-12836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864343#comment-16864343 ] Andrew Duffy commented on FLINK-12836: -- Hey, yea the intention is to allow checkpoints to be reused after a successful completion of a job. The scenarios where this comes up for us: * Recurring bounded stream processing job, which can be re-run multiple times and pick up from where the previous one left off with state/offsets intact. * Hybrid storage: bootstrapping state by running over the complete set of historical data, then restoring that state to start reading realtime data. For both of these, it's considerably easier for an outside service (or human) to orchestrate around job completions than trying to properly time a cancellation, particularly when you're already trying to read one source to completion. > Allow retained checkpoints to be persisted on success > - > > Key: FLINK-12836 > URL: https://issues.apache.org/jira/browse/FLINK-12836 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Andrew Duffy >Assignee: vinoyang >Priority: Major > > Currently, retained checkpoints are persisted with one of 3 strategies: > * {color:#33}CHECKPOINT_NEVER_RETAINED:{color} Retained checkpoints are > never persisted > * {color:#33}CHECKPOINT_RETAINED_ON_FAILURE:{color}{color:#33} > Latest retained checkpoint{color} is persisted in the face of job failures > * {color:#33}CHECKPOINT_RETAINED_ON_CANCELLATION{color}: Latest retained > checkpoint is persisted when job is canceled externally (e.g. via the REST > API) > > I'm proposing a third persistence mode: _CHECKPOINT_RETAINED_ALWAYS_. This > mode would ensure that retained checkpoints are retained on successful > completion of the job, and can be resumed from later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12647) Make partition release on consumption optional
[ https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12647: - Description: ResultPartitions being released on consumption should no longer a hard-coded behavior, which is a prerequisite for making partitions consumable multiple times, without knowing ahead-of-time how often that might be. To retain the existing behavior a configurable flag should be introduced to force the consumption of all release partitions,which will likely be removed in the future. (was: Add a feature flag so that the network continues to automatically release partitions once they have been consumed, to ease development and potentially as a failsafe in the future.) > Make partition release on consumption optional > -- > > Key: FLINK-12647 > URL: https://issues.apache.org/jira/browse/FLINK-12647 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > ResultPartitions being released on consumption should no longer a hard-coded > behavior, which is a prerequisite for making partitions consumable multiple > times, without knowing ahead-of-time how often that might be. To retain the > existing behavior a configurable flag should be introduced to force the > consumption of all release partitions,which will likely be removed in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12647) Make partition release on consumption optional
[ https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12647: - Summary: Make partition release on consumption optional (was: Add feature flag to disable automatic release of partitions) > Make partition release on consumption optional > -- > > Key: FLINK-12647 > URL: https://issues.apache.org/jira/browse/FLINK-12647 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Add a feature flag so that the network continues to automatically release > partitions once they have been consumed, to ease development and potentially > as a failsafe in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12647) Make partition release on consumption optional
[ https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12647. Resolution: Fixed master: d9ac854a4155d14ffd9d81f883f796231b8e55e5 > Make partition release on consumption optional > -- > > Key: FLINK-12647 > URL: https://issues.apache.org/jira/browse/FLINK-12647 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Add a feature flag so that the network continues to automatically release > partitions once they have been consumed, to ease development and potentially > as a failsafe in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions
zentol merged pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions URL: https://github.com/apache/flink/pull/8654 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12851) Add travis profile for gelly/kafka
[ https://issues.apache.org/jira/browse/FLINK-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12851. Resolution: Fixed master: b63c0f16dc0cd727d30459ee4d450662e47fe5f0 > Add travis profile for gelly/kafka > -- > > Key: FLINK-12851 > URL: https://issues.apache.org/jira/browse/FLINK-12851 > Project: Flink > Issue Type: Improvement > Components: Travis >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The misc/tests profiles frequently hit timeouts; we can resolve this by > moving gelly and the universal kafka connector into a separate profile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile
zentol merged pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile URL: https://github.com/apache/flink/pull/8743 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864321#comment-16864321 ] Bowen Li edited comment on FLINK-12771 at 6/14/19 6:15 PM: --- Hi [~dawidwys] , here the [spec for Hive temp tables |https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables]. Basically the temporary objects will be held at Hive metastore client (client side), rather than Hive metastore service (server side) Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we shouldn't limit ourselves to that considering the use cases and experience we want to support. Re: 3/4. We should put good, clear documentation in place to educate users only the new {{CatalogTable}} can be persisted, as well as loggings for applications and noticeable reminder for interactive consoles. Note that this only effect Table API users, not SQL CLI users. Table API users are typically more experienced and advanced Flink users, and it won't be hard for them to learn that, all inline tables are not persistent in 1.8 or older versions, and they will remain so in 1.9 and beyond. was (Author: phoenixjiangnan): Hi [~dawidwys] , here the [spec for Hive temp tables |https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables]. Basically the temporary objects will be held at Hive metastore client (client side), rather than Hive metastore service (server side) Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we shouldn't limit ourselves to that considering the use cases and experience we want to support. Re: 3/4. We should put good, clear documentation in place to educate users only the new {{CatalogTable}} can be persisted, as well as loggings for applications and noticeable reminder for interactive consoles. Note that this only effect Table API users, not SQL CLI users. Table API users are often more experienced in Flink, and it won't be hard for them to learn that, all inline tables are not persistent in 1.8 or older versions, and they will remain so in 1.9 and beyond. > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's > a major drawback on this when it comes to real use cases, that is when Table > API users set a {{HiveCatalog}} as their default catalog (which is very > likely), they cannot create or use any inline table sources/sinks with their > default catalog any more. It's really inconvenient for Table API users to use > Flink for exploration, experiment, and production. > There are several workaround in this case. E.g. users have to switch their > default catalog, but that misses our original intention of having a default > {{HiveCatalog}}; or users can register their inline source/sinks to Flink's > default catalog which is a in memory catalog, but that not only require users > to type full path of a table but also requires users to be aware of the > Flink's default catalog, default db, and their names. In short, none of the > workaround seems to be reasonable and user friendly. > From another perspective, Hive has the concept of temporary tables that are > stored in memory of Hive metastore client and are removed when client is shut > down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of > session-based temporary table, and {{HiveCatalog}} (potentially any catalog > implementations) can store it in memory. By introducing the concept of temp > table, we could greatly eliminate frictions for users and raise their > experience and productivity. > Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} > in {{HiveCatalog}} to allow users create and use inline source/sink when > their default catalog is a {{HiveCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864321#comment-16864321 ] Bowen Li commented on FLINK-12771: -- Hi [~dawidwys] , here the [spec for Hive temp tables |https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables]. Basically the temporary objects will be held at Hive metastore client (client side), rather than Hive metastore service (server side) Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we shouldn't limit ourselves to that considering the use cases and experience we want to support. Re: 3/4. We should put good, clear documentation in place to educate users only the new {{CatalogTable}} can be persisted, as well as loggings for applications and noticeable reminder for interactive consoles. Note that this only effect Table API users, not SQL CLI users. Table API users are often more experienced in Flink, and it won't be hard for them to learn that, all inline tables are not persistent in 1.8 or older versions, and they will remain so in 1.9 and beyond. > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's > a major drawback on this when it comes to real use cases, that is when Table > API users set a {{HiveCatalog}} as their default catalog (which is very > likely), they cannot create or use any inline table sources/sinks with their > default catalog any more. It's really inconvenient for Table API users to use > Flink for exploration, experiment, and production. > There are several workaround in this case. E.g. users have to switch their > default catalog, but that misses our original intention of having a default > {{HiveCatalog}}; or users can register their inline source/sinks to Flink's > default catalog which is a in memory catalog, but that not only require users > to type full path of a table but also requires users to be aware of the > Flink's default catalog, default db, and their names. In short, none of the > workaround seems to be reasonable and user friendly. > From another perspective, Hive has the concept of temporary tables that are > stored in memory of Hive metastore client and are removed when client is shut > down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of > session-based temporary table, and {{HiveCatalog}} (potentially any catalog > implementations) can store it in memory. By introducing the concept of temp > table, we could greatly eliminate frictions for users and raise their > experience and productivity. > Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} > in {{HiveCatalog}} to allow users create and use inline source/sink when > their default catalog is a {{HiveCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog
xuefuz commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog URL: https://github.com/apache/flink/pull/8720#issuecomment-502209972 Also, since we are using a map to store the temp tables with combo of db name and table name as the key, I'm wondering what impact those database operations might have, such as drop a database. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#issuecomment-502176398 @StefanRRichter I have addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293880412 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ## @@ -172,14 +232,133 @@ public boolean isEmpty() { * @return the state of the mapping with the specified key/namespace composite key, or {@code null} * if no mapping for the specified key is found. */ - public abstract S get(K key, N namespace); + public S get(K key, N namespace) { + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups()); + return get(key, keyGroup, namespace); + } + + public Stream getKeys(N namespace) { + return Arrays.stream(state) + .filter(Objects::nonNull) + .map(StateMap::iterator) + .flatMap(iter -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false)) Review comment: Yes, we can create a spliterator with known size as you say. But in this case, we will not iterate keys in parallel, so the size will not be used. On the other hand, the size for `NestedStateMap` need some computation. So I think there is no need to get the size of the spliterator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink
sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink URL: https://github.com/apache/flink/pull/8225#issuecomment-502163792 I appreciate if you can have another look :) @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#discussion_r293862114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java ## @@ -49,13 +59,17 @@ /** Does this partition use a limited number of (network) buffers? */ private final boolean isBounded; + /** This partition will not be released after consuming if 'isPersistent' is true. */ + private final boolean isPersistent; Review comment: It seems "external" might be better than "global" since "internal" is better than "local"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
StephanEwen commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#discussion_r293857231 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java ## @@ -49,13 +59,17 @@ /** Does this partition use a limited number of (network) buffers? */ private final boolean isBounded; + /** This partition will not be released after consuming if 'isPersistent' is true. */ + private final boolean isPersistent; Review comment: I agree that BLOCKING_PERSISTENT is not the best name. The specific thing is that it lives across jobs, but I am not sure what a good name for that would be. "global" and "external" both go in that direction... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on issue #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream
sunhaibotb commented on issue #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream URL: https://github.com/apache/flink/pull/8642#issuecomment-502151128 Thank you for reviewing @aljoscha . Because `JobMasterTest.Java` conflicts with the latest master, I will rebase on the top of the latest master and force push after addressing the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293850537 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ## @@ -156,10 +203,23 @@ public boolean isEmpty() { * @param transformation the transformation function. * @throws Exception if some exception happens in the transformation function. */ - public abstract void transform( + public void transform( N namespace, T value, - StateTransformationFunction transformation) throws Exception; + StateTransformationFunction transformation) throws Exception { + K key = keyContext.getCurrentKey(); + checkKeyNamespacePreconditions(key, namespace); + + int keyGroup = keyContext.getCurrentKeyGroupIndex(); + StateMap stateMap = getMapForKeyGroup(keyGroup); + + if (stateMap == null) { + stateMap = createStateMap(); Review comment: The style of checking `null` is just following the original NestedMapsStateTable. For the followup spill implementation, we just replace the on-heap map with on-disk map, instead of removing the map from the array, so there is no need to set `null` in the array. I agree with you that no map will stay `null` for long if the number of keys is more than the number of key-groups and keys are uniformly distributed, which I think is the most case. So I will initialize all array positions with maps and remove the check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
flinkbot commented on issue #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8744#issuecomment-502145656 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha opened a new pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
aljoscha opened a new pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8744 Copy of #8280 for running CI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293838949 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java ## @@ -19,34 +19,123 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateEntry; import org.apache.flink.runtime.state.StateSnapshot; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + /** * Abstract base class for snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). * All snapshots should be released after usage. */ @Internal -abstract class AbstractStateTableSnapshot> implements StateSnapshot { +abstract class AbstractStateTableSnapshot> + implements StateSnapshot, StateSnapshot.StateKeyGroupWriter { /** * The {@link StateTable} from which this snapshot was created. */ - final T owningStateTable; + protected final T owningStateTable; + + /** +* A local duplicate of the table's key serializer. +*/ + @Nonnull + protected final TypeSerializer localKeySerializer; + + /** +* A local duplicate of the table's namespace serializer. +*/ + @Nonnull + protected final TypeSerializer localNamespaceSerializer; + + /** +* A local duplicate of the table's state serializer. +*/ + @Nonnull + protected final TypeSerializer localStateSerializer; + + @Nullable + protected final StateSnapshotTransformer stateSnapshotTransformer; /** * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table. * * @param owningStateTable the {@link StateTable} for which this object represents a snapshot. */ - AbstractStateTableSnapshot(T owningStateTable) { + AbstractStateTableSnapshot( + T owningStateTable, + TypeSerializer localKeySerializer, Review comment: > The annotation also generates an assertion check for non-null if the code is executed with -ea I am confused about this. I use a simple code to test the assertion, but nothing happened no matter whether the `-ea` is added. The code is as follows, the version of jsr305 jar is 1.3.9 which is the same as flink used, and the version of java is 1.8.0_162. ```Java import javax.annotation.Nonnull; public class AnnotationTest { public static void main(String[] args) { func(null); } private static void func(@Nonnull Object object) { System.out.println("success"); } } ``` But I find Intellij has a setting `Add runtime assertions for not-null annotated methods and parameters`,and if I run the code in Intellij with this setting enabled, there will be an `IllegalArgumentException` throwed. Please let me know if I'm wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293838949 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java ## @@ -19,34 +19,123 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateEntry; import org.apache.flink.runtime.state.StateSnapshot; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + /** * Abstract base class for snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). * All snapshots should be released after usage. */ @Internal -abstract class AbstractStateTableSnapshot> implements StateSnapshot { +abstract class AbstractStateTableSnapshot> + implements StateSnapshot, StateSnapshot.StateKeyGroupWriter { /** * The {@link StateTable} from which this snapshot was created. */ - final T owningStateTable; + protected final T owningStateTable; + + /** +* A local duplicate of the table's key serializer. +*/ + @Nonnull + protected final TypeSerializer localKeySerializer; + + /** +* A local duplicate of the table's namespace serializer. +*/ + @Nonnull + protected final TypeSerializer localNamespaceSerializer; + + /** +* A local duplicate of the table's state serializer. +*/ + @Nonnull + protected final TypeSerializer localStateSerializer; + + @Nullable + protected final StateSnapshotTransformer stateSnapshotTransformer; /** * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table. * * @param owningStateTable the {@link StateTable} for which this object represents a snapshot. */ - AbstractStateTableSnapshot(T owningStateTable) { + AbstractStateTableSnapshot( + T owningStateTable, + TypeSerializer localKeySerializer, Review comment: > The annotation also generates an assertion check for non-null if the code is executed with -ea I am confused about this. I use a simple code to test the assertion, but nothing happened no matter whether the `-ea` is added. The code is as follows, the version of jsr305 jar is 1.3.9 which is the same as flink used, and the version of java is 1.8.0_162. ```Java import javax.annotation.Nonnull; public class AnnotationTest { public static void main(String[] args) { func(null); } private static void func(@Nonnull Object object) { System.out.println("success"); } } ``` But I find Intellij has a setting `Add runtime assertions for not-null annotated methods and parameters`,and if I run the code in Intellij with this setting enabled, there will be an `IllegalArgumentException` throwed. Please let me know if I'm wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293838949 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java ## @@ -19,34 +19,123 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateEntry; import org.apache.flink.runtime.state.StateSnapshot; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + /** * Abstract base class for snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). * All snapshots should be released after usage. */ @Internal -abstract class AbstractStateTableSnapshot> implements StateSnapshot { +abstract class AbstractStateTableSnapshot> + implements StateSnapshot, StateSnapshot.StateKeyGroupWriter { /** * The {@link StateTable} from which this snapshot was created. */ - final T owningStateTable; + protected final T owningStateTable; + + /** +* A local duplicate of the table's key serializer. +*/ + @Nonnull + protected final TypeSerializer localKeySerializer; + + /** +* A local duplicate of the table's namespace serializer. +*/ + @Nonnull + protected final TypeSerializer localNamespaceSerializer; + + /** +* A local duplicate of the table's state serializer. +*/ + @Nonnull + protected final TypeSerializer localStateSerializer; + + @Nullable + protected final StateSnapshotTransformer stateSnapshotTransformer; /** * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table. * * @param owningStateTable the {@link StateTable} for which this object represents a snapshot. */ - AbstractStateTableSnapshot(T owningStateTable) { + AbstractStateTableSnapshot( + T owningStateTable, + TypeSerializer localKeySerializer, Review comment: > The annotation also generates an assertion check for non-null if the code is executed with -ea I am confused about this. I use a simple code to test the assertion, but nothing happened no matter whether the `-ea` is added. The code is as follows, the version of jsr305 jar is 1.3.9 which is the same as flink used, and the version of java is 1.8.0_162. ```Java import javax.annotation.Nonnull; public class AnnotationTest { public static void main(String[] args) { func(null); } private static void func(@Nonnull Object object) { System.out.println("success"); } } ``` But I find Intellij has a setting `Add runtime assertions for not-null annotated methods and parameters`,and if I run the code in Intellij with this setting enabled, there will be an `IllegalArgumentException` throwed. Please let me know if I'm wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8623: [FLINK-12719][python] Add the Python catalog API
asfgit closed pull request #8623: [FLINK-12719][python] Add the Python catalog API URL: https://github.com/apache/flink/pull/8623 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293830121 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -69,20 +133,11 @@ * evaluated as illegal by the validator */ public Params set(ParamInfo info, V value) { - if (!info.isOptional() && value == null) { - throw new RuntimeException( - "Setting " + info.getName() + " as null while it's not a optional param"); - } - if (value == null) { - remove(info); - return this; - } - if (info.getValidator() != null && !info.getValidator().validate(value)) { throw new RuntimeException( "Setting " + info.getName() + " as a invalid value:" + value); } - paramMap.put(info.getName(), value); + params.put(info.getName(), valueToJson(value)); Review comment: If we changed ParamInfo name, we can add the old name to aliases, thus, when we get params from old json, we could also get the param value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12719) Add the Python catalog API
[ https://issues.apache.org/jira/browse/FLINK-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12719. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: e087c1e803b1535dbb07873c3ddaa1a214b2c849 > Add the Python catalog API > -- > > Key: FLINK-12719 > URL: https://issues.apache.org/jira/browse/FLINK-12719 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > The new catalog API is almost ready. We should add the corresponding Python > catalog API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864111#comment-16864111 ] Congxian Qiu(klion26) commented on FLINK-12619: --- Thanks for the doc [~carp84], and the confirmation [~aljoscha], I'll continue to complete the pr :) > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293826982 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -114,38 +171,104 @@ public Params clone() { * @return a json containing all parameters in this Params */ public String toJson() { - ObjectMapper mapper = new ObjectMapper(); - Map stringMap = new HashMap<>(); try { - for (Map.Entry e : paramMap.entrySet()) { - stringMap.put(e.getKey(), mapper.writeValueAsString(e.getValue())); - } - return mapper.writeValueAsString(stringMap); + return mapper.writeValueAsString(params); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize params to json", e); } } /** * Restores the parameters from the given json. The parameters should be exactly the same with -* the one who was serialized to the input json after the restoration. The class mapping of the -* parameters in the json is required because it is hard to directly restore a param of a user -* defined type. Params will be treated as String if it doesn't exist in the {@code classMap}. +* the one who was serialized to the input json after the restoration. * -* @param json the json String to restore from -* @param classMap the classes of the parameters contained in the json +* @param json the json String to restore from */ @SuppressWarnings("unchecked") - public void loadJson(String json, Map> classMap) { + public void loadJson(String json) { ObjectMapper mapper = new ObjectMapper(); + Map params; try { - Map m = mapper.readValue(json, Map.class); - for (Map.Entry e : m.entrySet()) { - Class valueClass = classMap.getOrDefault(e.getKey(), String.class); - paramMap.put(e.getKey(), mapper.readValue(e.getValue(), valueClass)); + params = mapper.readValue(json, Map.class); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize json:" + json, e); + } + this.params.clear(); + this.params.putAll(params); + } + + /** +* Factory method for constructing params. +* +* @param json the json string to load +* @return the {@code Params} loaded from the json string. +*/ + public static Params fromJson(String json) { Review comment: We have redefined the loadJson(), and static method fromJson() is easy to generate a new Params. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293825335 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -114,38 +171,104 @@ public Params clone() { * @return a json containing all parameters in this Params */ public String toJson() { - ObjectMapper mapper = new ObjectMapper(); - Map stringMap = new HashMap<>(); try { - for (Map.Entry e : paramMap.entrySet()) { - stringMap.put(e.getKey(), mapper.writeValueAsString(e.getValue())); - } - return mapper.writeValueAsString(stringMap); + return mapper.writeValueAsString(params); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize params to json", e); } } /** * Restores the parameters from the given json. The parameters should be exactly the same with -* the one who was serialized to the input json after the restoration. The class mapping of the -* parameters in the json is required because it is hard to directly restore a param of a user -* defined type. Params will be treated as String if it doesn't exist in the {@code classMap}. +* the one who was serialized to the input json after the restoration. * -* @param json the json String to restore from -* @param classMap the classes of the parameters contained in the json +* @param json the json String to restore from */ @SuppressWarnings("unchecked") - public void loadJson(String json, Map> classMap) { + public void loadJson(String json) { ObjectMapper mapper = new ObjectMapper(); + Map params; try { - Map m = mapper.readValue(json, Map.class); - for (Map.Entry e : m.entrySet()) { - Class valueClass = classMap.getOrDefault(e.getKey(), String.class); - paramMap.put(e.getKey(), mapper.readValue(e.getValue(), valueClass)); + params = mapper.readValue(json, Map.class); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize json:" + json, e); + } + this.params.clear(); Review comment: Yes, I have changed to merge new params to exist params. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293823761 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -114,38 +171,104 @@ public Params clone() { * @return a json containing all parameters in this Params */ public String toJson() { - ObjectMapper mapper = new ObjectMapper(); - Map stringMap = new HashMap<>(); try { - for (Map.Entry e : paramMap.entrySet()) { - stringMap.put(e.getKey(), mapper.writeValueAsString(e.getValue())); - } - return mapper.writeValueAsString(stringMap); + return mapper.writeValueAsString(params); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize params to json", e); } } /** * Restores the parameters from the given json. The parameters should be exactly the same with -* the one who was serialized to the input json after the restoration. The class mapping of the -* parameters in the json is required because it is hard to directly restore a param of a user -* defined type. Params will be treated as String if it doesn't exist in the {@code classMap}. +* the one who was serialized to the input json after the restoration. * -* @param json the json String to restore from -* @param classMap the classes of the parameters contained in the json +* @param json the json String to restore from */ @SuppressWarnings("unchecked") - public void loadJson(String json, Map> classMap) { + public void loadJson(String json) { ObjectMapper mapper = new ObjectMapper(); + Map params; try { - Map m = mapper.readValue(json, Map.class); - for (Map.Entry e : m.entrySet()) { - Class valueClass = classMap.getOrDefault(e.getKey(), String.class); - paramMap.put(e.getKey(), mapper.readValue(e.getValue(), valueClass)); + params = mapper.readValue(json, Map.class); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize json:" + json, e); + } + this.params.clear(); + this.params.putAll(params); + } + + /** +* Factory method for constructing params. +* +* @param json the json string to load +* @return the {@code Params} loaded from the json string. +*/ + public static Params fromJson(String json) { + Params params = new Params(); + params.loadJson(json); + return params; + } + + /** +* Merge other params into this. +* +* @param otherParams other params +* @return this +*/ + public Params merge(Params otherParams) { + if (otherParams != null) { + this.params.putAll(otherParams.params); + } + return this; + } + + /** +* Creates and returns a deep clone of this Params. +* +* @return a deep clone of this Params +*/ + @Override + public Params clone() { + Params newParams = new Params(); + newParams.params.putAll(this.params); + return newParams; + } + + private void assertMapperInited() { + if (mapper == null) { + mapper = new ObjectMapper(); + } + } + + private String valueToJson(Object value) { + assertMapperInited(); + try { + if (value == null) { + return null; } + return mapper.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize to json:" + value, e); + } + } + + private T valueFromJson(String json, Class clazz) { + assertMapperInited(); + try { + if (json == null) { + return null; + } + return mapper.readValue(json, clazz); } catch (IOException e) { throw new RuntimeException("Failed to deserialize json:" + json, e); } } + + private List getParamNameAndAlias( + ParamInfo paramInfo) { +
[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293789454 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -44,17 +86,39 @@ * @param the type of the specific parameter * @return the value of the specific parameter, or default value defined in the {@code info} if * this Params doesn't contain the parameter -* @throws RuntimeException if the Params doesn't contains the specific parameter, while the -* param is not optional but has no default value in the {@code info} +* @throws IllegalArgumentException if the Params doesn't contains the specific parameter, while the +* param is not optional but has no default value in the {@code info} or +* if the Params contains the specific parameter and alias, but has more +* than one value or +* if the Params doesn't contains the specific parameter, while the ParamInfo +* is optional but has no default value */ - @SuppressWarnings("unchecked") public V get(ParamInfo info) { - V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue()); - if (value == null && !info.isOptional() && !info.hasDefaultValue()) { - throw new RuntimeException(info.getName() + - " not exist which is not optional and don't have a default value"); + String value = null; + String usedParamName = null; + for (String nameOrAlias : getParamNameAndAlias(info)) { + String v = params.get(nameOrAlias); + if (value != null && v != null) { + throw new IllegalArgumentException(String.format("Duplicate parameters of %s and %s", Review comment: `params.put` can use `ParamInfo.getName` to get the key. It will not be able to generate duplicates and there is no information about paramName and alias at `putAll` from json. Probably `params.get` here is the proper place to check duplicates This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864098#comment-16864098 ] Aljoscha Krettek commented on FLINK-12619: -- We had a short (but productive) call on this topic, so don't be surprised by us agreeing suddenly. ;-) After outrdiscussion and having read the document I now agree with this view and think the analogy to database snapshots is very good. We should go forward with this stop-with-checkpoint feature. Regarding FLIP-41, we agreed to call it unified format even though it's for now only used for savepoints. > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068 ] Aljoscha Krettek edited comment on FLINK-12619 at 6/14/19 1:59 PM: --- Wrote a document to arrange and better share my thoughts, and below is a brief summary: # Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). # Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. # In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! The document: https://docs.google.com/document/d/1uE4R3wNal6e67FkDe0UvcnsIMMDpr35j was (Author: carp84): Wrote a document to arrange and better share my thoughts, and below is a brief summary: # Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). # Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. # In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12773) Unstable kafka e2e test
[ https://issues.apache.org/jira/browse/FLINK-12773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864095#comment-16864095 ] Alex commented on FLINK-12773: -- Is it possible to download/prepare external dependencies once and provision them in the CI environment? Alternatively, upload them into S3 and use S3 storage instead. Both options would add maintenance overhead, but at least such approach can be more reliable. > Unstable kafka e2e test > --- > > Key: FLINK-12773 > URL: https://issues.apache.org/jira/browse/FLINK-12773 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Major > > 'Kafka 0.10 end-to-end test' fails on Travis occasionally because of > corrupted downloaded kafka archive. > https://api.travis-ci.org/v3/job/542507472/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12143) Mechanism to ship plugin jars in the cluster
[ https://issues.apache.org/jira/browse/FLINK-12143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864085#comment-16864085 ] Alex commented on FLINK-12143: -- The problem is because * {{parentFirstLoaderPatterns}} contains {{org.apache.flink.}} as matching prefix; * the hadoop file system component loads some classes dynamically ({{java.lang.Class.forName}}) and one of them {{org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.PropertiesConfiguration}} matches the pattern. This triggers to use the application global class loader, instead of searching in the jar. The test failure can be reproduced by slightly modifying {{PluginLoaderTest}}: setting the matching {{loaderExcludePatterns}} on [line 42|[https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java#L42]]. For example to: {code:java} // Use explicit prefix of org.apache.flink.test.plugin.jar.plugina.DynamicClassA PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", new URL[]{classpathA}, new String[]{"org.apache.flink."}); {code} Although, running this test in IDE would not throw {{ClassNotFound}} exception, but the actual class lookup would use the global class loader. > Mechanism to ship plugin jars in the cluster > > > Key: FLINK-12143 > URL: https://issues.apache.org/jira/browse/FLINK-12143 > Project: Flink > Issue Type: Sub-task > Components: FileSystems, Runtime / Coordination >Reporter: Stefan Richter >Assignee: Alex >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] EugeneYushin commented on issue #8187: [FLINK-12197] [Formats] Avro row deser for Confluent binary format
EugeneYushin commented on issue #8187: [FLINK-12197] [Formats] Avro row deser for Confluent binary format URL: https://github.com/apache/flink/pull/8187#issuecomment-502113331 @dawidwys can you please ping an another person who can help reviewing this PR in case you have no capacity? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068 ] Yu Li edited comment on FLINK-12619 at 6/14/19 1:32 PM: Wrote a document to arrange and better share my thoughts, and below is a brief summary: # Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). # Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. # In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! was (Author: carp84): Wrote a document to arrange and better share my thoughts, and below is a brief summary: 1. Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). 2. Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. 3. In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068 ] Yu Li commented on FLINK-12619: --- Wrote a document to arrange and better share my thoughts, and below is a brief summary: 1. Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). 2. Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. 3. In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068 ] Yu Li edited comment on FLINK-12619 at 6/14/19 1:31 PM: Wrote a document to arrange and better share my thoughts, and below is a brief summary: 1. Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). 2. Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. 3. In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! was (Author: carp84): Wrote a document to arrange and better share my thoughts, and below is a brief summary: 1. Conceptually it worth a second thought about introducing an optimized snapshot format for now (i.e. use checkpoint format in savepoint), just like it's not recommended to use snapshot for backup in database (although practically it could be implemented). 2. Stop-with-checkpoint mechanism is like stopping database instance with a data flush, thus (IMHO) a different story from the checkpoint/savepoint (db snapshot/backup) diversity. 3. In the long run we may improve the checkpoint to allow a short enough interval thus it may become some format of transactional log, then we could enable checkpoint-based savepoint (like transactional log based backup), so I agree to still call the new format in FLIP-41 a "Unified Format" although in the short term it only unifies savepoint. Please check it and let me know your thoughts everyone. Thanks! > Support TERMINATE/SUSPEND Job with Checkpoint > - > > Key: FLINK-12619 > URL: https://issues.apache.org/jira/browse/FLINK-12619 > Project: Flink > Issue Type: New Feature > Components: Runtime / State Backends >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by the idea of FLINK-11458, we propose to support terminate/suspend > a job with checkpoint. This improvement cooperates with incremental and > external checkpoint features, that if checkpoint is retained and this feature > is configured, we will trigger a checkpoint before the job stops. It could > accelarate job recovery a lot since: > 1. No source rewinding required any more. > 2. It's much faster than taking a savepoint since incremental checkpoint is > enabled. > Please note that conceptually savepoints is different from checkpoint in a > similar way that backups are different from recovery logs in traditional > database systems. So we suggest using this feature only for job recovery, > while stick with FLINK-11458 for the > upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293805008 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ## @@ -671,5 +737,100 @@ public InputSelection nextSelection() { return InputSelection.ALL; } } + + /** +* Uses to test handling the EndOfInput notification. +*/ + private static class TestBoundedTwoInputOperator extends AbstractStreamOperator + implements TwoInputStreamOperator, BoundedMultiInput { + + private static final long serialVersionUID = 1L; + + private final String name; + + private volatile int outputRecords1; + private volatile int outputRecords2; + + private static final CompletableFuture success = CompletableFuture.completedFuture(null); + private static final CompletableFuture failure = FutureUtils.completedExceptionally( + new Exception("Not in line with expectations.")); + + public TestBoundedTwoInputOperator(String name) { + this.name = name; + } + + @Override + public void processElement1(StreamRecord element) { + output.collect(element.replace("[" + name + "-1]: " + element.getValue())); + outputRecords1++; + + synchronized (this) { + this.notifyAll(); + } + } + + @Override + public void processElement2(StreamRecord element) { + output.collect(element.replace("[" + name + "-2]: " + element.getValue())); + outputRecords2++; + + synchronized (this) { + this.notifyAll(); + } + } + + @Override + public void endInput(int inputId) { + output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: Bye")); + + if (inputId == 1) { + outputRecords1++; + } else { + outputRecords2++; + } + + synchronized (this) { + this.notifyAll(); + } + } + + public void waitOutputRecords(int inputId, int expectedRecords) throws Exception { Review comment: This ensures that the output is strictly ordered, to verify whether `endInput()` is called immediately when `input1` arrives the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12806) Remove beta feature remark from the Universal Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-12806: - Priority: Major (was: Blocker) > Remove beta feature remark from the Universal Kafka connector > - > > Key: FLINK-12806 > URL: https://issues.apache.org/jira/browse/FLINK-12806 > Project: Flink > Issue Type: Task > Components: Connectors / Kafka >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > I think we can remove this remark from the docs as in the last half year > there were no issues reported that would say otherwise. > The remark about universal connector being a beta feature was introduced in: > https://issues.apache.org/jira/browse/FLINK-10900 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12776) Ambiguous content in flink-dist NOTICE file
[ https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863997#comment-16863997 ] sunjincheng edited comment on FLINK-12776 at 6/14/19 1:05 PM: -- Thanks for the reply [~Zentol]! I agree with you, we can remove the old `flink-python`, and share the good news here, then we do not need to do any change. :) please see the DISCUSS thread: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html] was (Author: sunjincheng121): Thanks for the reply [~Zentol]! I want to share the good news here that we can remove the old `flink-pytyon`, then we do not need to do any change. :) please see the DISCUSS thread: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html] > Ambiguous content in flink-dist NOTICE file > --- > > Key: FLINK-12776 > URL: https://issues.apache.org/jira/browse/FLINK-12776 > Project: Flink > Issue Type: Improvement > Components: API / Python, Release System >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > Attachments: image-2019-06-10-09-39-06-637.png > > > With FLINK-12409 we include the new flink-python module in flink-dist. As a > result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, > one for the old batch API and one for the newly added one, which is > ambiguous. We should rectify this by either excluding the old batch API from > flink-dist, or rename the new module to something like {{flink-api-python}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12071) HadoopRecoverableWriterTest fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-12071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864037#comment-16864037 ] Till Rohrmann commented on FLINK-12071: --- How would this harden the test case? Maybe you could try to run this single test in a loop on Travis to make it reproducible. > HadoopRecoverableWriterTest fails on Travis > --- > > Key: FLINK-12071 > URL: https://issues.apache.org/jira/browse/FLINK-12071 > Project: Flink > Issue Type: Bug > Components: FileSystems, Tests >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: leesf >Priority: Critical > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/513373067 > {code} > testExceptionWritingAfterCloseForCommit(org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest) > Time elapsed: 0.031 s <<< ERROR! > java.lang.Exception: Unexpected exception, expected but > was > Caused by: java.lang.IllegalArgumentException: Self-suppression not permitted > Caused by: java.io.IOException: The stream is closed > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile
flinkbot commented on issue #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile URL: https://github.com/apache/flink/pull/8743#issuecomment-502097767 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293744349 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ## @@ -597,6 +603,54 @@ public void testQuiesceTimerServiceAfterOpClose() throws Exception { timeService.shutdownService(); } + @Test + public void testHandlingEndOfInput() throws Exception { + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + 2, 2, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness + .setupOperatorChain(new OperatorID(), new TestBoundedOneInputOperator("Operator0")) + .chain( + new OperatorID(), + new TestBoundedOneInputOperator("Operator1"), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) + .finish(); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + TestBoundedOneInputOperator headOperator = (TestBoundedOneInputOperator) testHarness.getTask().headOperator; + + testHarness.processElement(new StreamRecord<>("Hello-0-0"), 0, 0); Review comment: This test was written a long time ago, when they were needed. Now `inputGate` and `channel` parameters are not needed, and I will delete them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293793699 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ## @@ -1011,5 +1065,70 @@ protected void handleWatermark(Watermark mark) { output.emitWatermark(mark); } } + + /** +* Uses to test handling the EndOfInput notification. +*/ + static class TestBoundedOneInputOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private static final long serialVersionUID = 1L; + + private final String name; + + private volatile int outputRecords; + + private static final CompletableFuture success = CompletableFuture.completedFuture(null); + private static final CompletableFuture failure = FutureUtils.completedExceptionally( + new Exception("Not in line with expectation.")); + + public TestBoundedOneInputOperator(String name) { + this.name = name; + } + + @Override + public void processElement(StreamRecord element) { + output.collect(element); + outputRecords++; + + synchronized (this) { + this.notifyAll(); + } + } + + @Override + public void endInput() { + output.collect(new StreamRecord<>("[" + name + "]: Bye")); + outputRecords++; + + synchronized (this) { + this.notifyAll(); + } + } + + public void waitOutputRecords(int expectedRecords) throws Exception { Review comment: This test was written a long time ago, when it was needed. Now not needed, and I will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12851) Add travis profile for gelly/kafka
[ https://issues.apache.org/jira/browse/FLINK-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12851: --- Labels: pull-request-available (was: ) > Add travis profile for gelly/kafka > -- > > Key: FLINK-12851 > URL: https://issues.apache.org/jira/browse/FLINK-12851 > Project: Flink > Issue Type: Improvement > Components: Travis >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > The misc/tests profiles frequently hit timeouts; we can resolve this by > moving gelly and the universal kafka connector into a separate profile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol opened a new pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile
zentol opened a new pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile URL: https://github.com/apache/flink/pull/8743 Introduces another test profile for running gelly and kafka (universal) tests. Additionally, now that kafka is out of misc. the pre-commit tests are moved back into misc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293789454 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -44,17 +86,39 @@ * @param the type of the specific parameter * @return the value of the specific parameter, or default value defined in the {@code info} if * this Params doesn't contain the parameter -* @throws RuntimeException if the Params doesn't contains the specific parameter, while the -* param is not optional but has no default value in the {@code info} +* @throws IllegalArgumentException if the Params doesn't contains the specific parameter, while the +* param is not optional but has no default value in the {@code info} or +* if the Params contains the specific parameter and alias, but has more +* than one value or +* if the Params doesn't contains the specific parameter, while the ParamInfo +* is optional but has no default value */ - @SuppressWarnings("unchecked") public V get(ParamInfo info) { - V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue()); - if (value == null && !info.isOptional() && !info.hasDefaultValue()) { - throw new RuntimeException(info.getName() + - " not exist which is not optional and don't have a default value"); + String value = null; + String usedParamName = null; + for (String nameOrAlias : getParamNameAndAlias(info)) { + String v = params.get(nameOrAlias); + if (value != null && v != null) { + throw new IllegalArgumentException(String.format("Duplicate parameters of %s and %s", Review comment: `params.put` can use `ParamInfo.getName` to get the key. It will not be able to generate duplicates and there is no information about paramName and alias at `putAll` from json. Probably `params.get` here is the proper place to check duplicates I agree with you it should be check duplicates in `ParamInfoFactory.setAlias` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.
[ https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862841#comment-16862841 ] Till Rohrmann edited comment on FLINK-12541 at 6/14/19 12:46 PM: - -Implemented via 00d90a4fe3ff93d19ff6be3ed7b9cf9d0f5b0dfd- was (Author: till.rohrmann): Implemented via 00d90a4fe3ff93d19ff6be3ed7b9cf9d0f5b0dfd > Add deploy a Python Flink job and session cluster on Kubernetes support. > > > Key: FLINK-12541 > URL: https://issues.apache.org/jira/browse/FLINK-12541 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Runtime / REST >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Add deploy a Python Flink job and session cluster on Kubernetes support. > We need to have the same deployment step as the Java job. Please see: > [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r293789454 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -44,17 +86,39 @@ * @param the type of the specific parameter * @return the value of the specific parameter, or default value defined in the {@code info} if * this Params doesn't contain the parameter -* @throws RuntimeException if the Params doesn't contains the specific parameter, while the -* param is not optional but has no default value in the {@code info} +* @throws IllegalArgumentException if the Params doesn't contains the specific parameter, while the +* param is not optional but has no default value in the {@code info} or +* if the Params contains the specific parameter and alias, but has more +* than one value or +* if the Params doesn't contains the specific parameter, while the ParamInfo +* is optional but has no default value */ - @SuppressWarnings("unchecked") public V get(ParamInfo info) { - V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue()); - if (value == null && !info.isOptional() && !info.hasDefaultValue()) { - throw new RuntimeException(info.getName() + - " not exist which is not optional and don't have a default value"); + String value = null; + String usedParamName = null; + for (String nameOrAlias : getParamNameAndAlias(info)) { + String v = params.get(nameOrAlias); + if (value != null && v != null) { + throw new IllegalArgumentException(String.format("Duplicate parameters of %s and %s", Review comment: `params.put` can use `ParamInfo.getName` to get the key. It will not be able to generate duplicates. There is no information about paramName and alias at `putAll` from json. Probably `params.get` is the proper place to check duplicates I agree with you it should be check duplicates in `ParamInfoFactory.setAlias` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann closed pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots
tillrohrmann closed pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots URL: https://github.com/apache/flink/pull/7227 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout
[ https://issues.apache.org/jira/browse/FLINK-11059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11059. --- Resolution: Fixed Fixed via 1.9.0: a4865cd67239c938e0075bcc2b112b944cc3c249 1.8.1: 5ef79de88726ddfccee30cb34a22711ca2b1116f 1.7.3: 65b6f99a69dbe65fc2dfaa97d0662f76909ef371 > JobMaster may continue using an invalid slot if releasing idle slot meet a > timeout > -- > > Key: FLINK-11059 > URL: https://issues.apache.org/jira/browse/FLINK-11059 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > When job master releases an idle slot to task executor, it may meet a timeout > exception which cause that task executor may have already released the slot, > but job master will add the slot back to available slots, and the slot may be > used again. Then job master will continue deploying task to the slot, but > task executor does not recognize it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API
sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API URL: https://github.com/apache/flink/pull/8623#issuecomment-502091680 +1 to merged 💯 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293765451 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala ## @@ -88,11 +89,17 @@ class StreamExecTableSourceScan( replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) } + def getSourceTransformation( + streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = { + tableSource.asInstanceOf[StreamTableSource[_]].getDataStream(streamEnv).getTransformation Review comment: We should cache the source transformation in this node to avoid calling `getDataSteram` multiple times which will lead to multiple source transformation in JobGraph. And make sure to call this `getSourceTransformation` instead of `getDataStream` in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293751698 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala ## @@ -121,6 +122,19 @@ class StreamExecSink[T]( "implemented and return the sink transformation DataStreamSink. " + s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.") } +val configSinkParallelism = NodeResourceConfig.getSinkParallelism( + tableEnv.getConfig.getConf) + +val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism + +if (maxSinkParallelism > 0 && configSinkParallelism <= maxSinkParallelism) { Review comment: The same problem as `BatchExecSink`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293773998 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala ## @@ -116,11 +116,16 @@ class StreamExecCalc( retainHeader = true, "StreamExecCalc" ) -new OneInputTransformation( +val ret = new OneInputTransformation( inputTransform, RelExplainUtil.calcToString(calcProgram, getExpressionString), substituteStreamOperator, BaseRowTypeInfo.of(outputType), - inputTransform.getParallelism) + getResource.getParallelism) + +if (getResource.getMaxParallelism > 0) { + ret.setMaxParallelism(getResource.getMaxParallelism) Review comment: Do we need to set max parallelism for calc? For example, a non-parallel source following a calc, I think the calc can scale out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293752274 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala ## @@ -121,6 +122,19 @@ class StreamExecSink[T]( "implemented and return the sink transformation DataStreamSink. " + s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.") } +val configSinkParallelism = NodeResourceConfig.getSinkParallelism( + tableEnv.getConfig.getConf) + +val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism + +if (maxSinkParallelism > 0 && configSinkParallelism <= maxSinkParallelism) { + dsSink.getTransformation.setParallelism(configSinkParallelism) +} +if (!UpdatingPlanChecker.isAppendOnly(this) && +dsSink.getTransformation.getParallelism != transformation.getParallelism) { + throw new TableException("Sink parallelism should be equal to input node when it is not" + Review comment: Improve the exception message: "The configured sink parallelism should be equal to the input node when input is an update stream. The input parallelism is ${...}, however the configured sink parallelism is ${...}" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293786650 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetter.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.resource.parallelism; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.table.plan.nodes.exec.ExecNode; +import org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan; +import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan; +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan; +import org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan; + +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Values; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Set final parallelism if needed at the beginning time, if parallelism of a node is set to be final, + * it will not be changed by other parallelism calculator. + */ +public class FinalParallelismSetter { + + private final StreamExecutionEnvironment env; + private Set> calculatedNodeSet = new HashSet<>(); + private Map, Integer> finalParallelismNodeMap = new HashMap<>(); + + private FinalParallelismSetter(StreamExecutionEnvironment env) { + this.env = env; + } + + /** +* Finding nodes that need to set final parallelism. +*/ + public static Map, Integer> calculate(StreamExecutionEnvironment env, List> sinkNodes) { + FinalParallelismSetter setter = new FinalParallelismSetter(env); + sinkNodes.forEach(setter::calculate); + return setter.finalParallelismNodeMap; + } + + private void calculate(ExecNode execNode) { + if (!calculatedNodeSet.add(execNode)) { + return; + } + if (execNode instanceof BatchExecTableSourceScan) { + calculateTableSource((BatchExecTableSourceScan) execNode); + } else if (execNode instanceof StreamExecTableSourceScan) { + calculateTableSource((StreamExecTableSourceScan) execNode); + } else if (execNode instanceof BatchExecBoundedStreamScan) { + calculateBoundedStreamScan((BatchExecBoundedStreamScan) execNode); + } else if (execNode instanceof StreamExecDataStreamScan) { + calculateDataStreamScan((StreamExecDataStreamScan) execNode); + } else if (execNode instanceof Values) { + calculateValues(execNode); + } else { + calculateIfSingleton(execNode); + } + } + + private void calculateTableSource(BatchExecTableSourceScan tableSourceScan) { + StreamTransformation transformation = tableSourceScan.getSourceTransformation(env); + if (transformation.getMaxParallelism() > 0) { + tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism()); Review comment: It seems that the max parallelism of source transformation is not used to avoid set an invalid parallelism like what we do for sink? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL
wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL URL: https://github.com/apache/flink/pull/8718#discussion_r293750904 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala ## @@ -92,17 +93,15 @@ class BatchExecSink[T]( } val sinkTransformation = dsSink.getTransformation -if (sinkTransformation.getMaxParallelism > 0) { - sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism) -} else { - val configSinkParallelism = NodeResourceConfig.getSinkParallelism( -tableEnv.getConfig.getConf) - if (configSinkParallelism > 0) { -sinkTransformation.setParallelism(configSinkParallelism) - } else if (boundedStream.getParallelism > 0) { -sinkTransformation.setParallelism(boundedStream.getParallelism) - } +val configSinkParallelism = NodeResourceConfig.getSinkParallelism( + tableEnv.getConfig.getConf) + +val maxSinkParallelism = sinkTransformation.getMaxParallelism + +if (maxSinkParallelism > 0 && configSinkParallelism <= maxSinkParallelism) { Review comment: The sink parallelism only works when max parallelism is set? And If the `configSinkParallelism` is -1, the sink parallelism will be set to -1 ? Maybe the logic should be changed to this: ```java // only set user's parallelism when user defines a sink parallelism if (configSinkParallelism > 0) { // set the parallelism when user's parallelism is not larger than max parallelism or max parallelism is not set if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) { sinkTransformation.setParallelism(configSinkParallelism) } } ``` And we should add cases to cover this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on issue #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs
HuangXingBo commented on issue #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs URL: https://github.com/apache/flink/pull/8732#issuecomment-502090131 Thanks a lot for @dianfu @sunjincheng121. 1. I add logic about sphinx installed and check in lint-python.sh. 2. I add a generate-docs.sh responsible for call the script lint-python.sh to generate python api docs and copy the api docs to the specified target path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293741881 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } + /** +* Starting from the second operator, go forward through the operator chain to notify +* each operator that its input has ended. +* +* @throws Exception if some exception happens in the endInput function of an operator. +*/ + public void endOperatorInputs() throws Exception { + for (int i = allOperators.length - 2; i >= 0; i--) { Review comment: > I think you could simplify the code if you iterated through all operators in OperatorChain (not starting from the second one), check each one of them if there are instanceof BoundedOneInput and end them if they are. > >This would work for: > > - OneInputStreamTask - because all of them are/can be BoundedOneInput > - SourceStreamTask - because head will not be BoundedOneInput > - TwoInputStreamTask - because head will not be BoundedOneInput You are right. However, if chaining non-header two input operator is allowed in the future, it will have problems. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file
[ https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863997#comment-16863997 ] sunjincheng commented on FLINK-12776: - Thanks for the reply [~Zentol]! I want to share the good news here that we can remove the old `flink-pytyon`, then we do not need to do any change. :) please see the DISCUSS thread: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html] > Ambiguous content in flink-dist NOTICE file > --- > > Key: FLINK-12776 > URL: https://issues.apache.org/jira/browse/FLINK-12776 > Project: Flink > Issue Type: Improvement > Components: API / Python, Release System >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > Attachments: image-2019-06-10-09-39-06-637.png > > > With FLINK-12409 we include the new flink-python module in flink-dist. As a > result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, > one for the old batch API and one for the newly added one, which is > ambiguous. We should rectify this by either excluding the old batch API from > flink-dist, or rename the new module to something like {{flink-api-python}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha commented on a change in pull request #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream
aljoscha commented on a change in pull request #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream URL: https://github.com/apache/flink/pull/8642#discussion_r293771876 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java ## @@ -47,6 +48,10 @@ } else if (operator instanceof StreamSource && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) { return new SimpleInputFormatOperatorFactory((StreamSource) operator); + } else if (operator instanceof StreamSink && + ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) { + //noinspection unchecked Review comment: I think `noinspection` is only for IntelliJ, for properly suppressing warnings here we have to use `@SuppressWarnings("unchecked")`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services