[GitHub] [flink] HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration
HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration URL: https://github.com/apache/flink/pull/8303#issuecomment-496370553 @rmetzger @tillrohrmann I was blocked by setting up the secured MiniDFSCluster in integration tests, so took a little more time on the PR. Please review it at your most convenient time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side
flinkbot commented on issue #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side URL: https://github.com/apache/flink/pull/8556#issuecomment-496368873 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side
[ https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12171: --- Labels: pull-request-available (was: ) > The network buffer memory size should not be checked against the heap size on > the TM side > - > > Key: FLINK-12171 > URL: https://issues.apache.org/jira/browse/FLINK-12171 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the > logic here. > >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > > Currently when computing the network buffer memory size on the TM side in > _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or > _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), > the computed network buffer memory size is checked to be less than > `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the > maximum heap memory (namely -Xmx) . > > With the above process, when TM starts, -Xmx is computed in RM or in > _taskmanager.sh_ with (container memory - network buffer memory - managed > memory), thus the above checking implies that the heap memory of the TM must > be larger than the network memory, which seems to be not necessary. > > > Therefore, I think the network buffer memory size also need to be checked > against the total memory instead of the heap memory on the TM side: > # Checks that networkBufFraction < 1.0. > # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)). > # Compare the network buffer memory with the total memory. > This checking is also consistent with the similar one done on the RM side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] gaoyunhaii opened a new pull request #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side
gaoyunhaii opened a new pull request #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side URL: https://github.com/apache/flink/pull/8556 ## What is the purpose of the change This pull request fixes the bug that limits the network buffer size with the heap size on the TM side. In fact, network buffer occupies a part of direct memory and is independent with the heap. To fix this problem, The limitation on the TM side is removed. Although we may want to compare the network memory size with the total memory size on the TM side, currently we can only compute the total memory with heap + computed network memory and the computed total memory should be always larger than the computed network memory. To remove the limitation, the max allowed memory used to check the network memory size on TM side is changed to Long.MAX_VALUE. Another option is to move the checking to the caller function on the RM side. however, it is not easy to achieve since the checking relies on the configured values of MIN and MAX, and it is not accessible outside of the current function. ## Brief change log - *Change the maximum allow memory on TM side to Long.MAX_VALUE.* ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running a cluster with two task managers for both standalone and YARN mode, and test the configuration with heap = 3G/network = 2G and heap = 5G/network = 2G*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf commented on a change in pull request #8543: [FLINK-12101] Race condition when concurrently running uploaded jars via REST
leesf commented on a change in pull request #8543: [FLINK-12101] Race condition when concurrently running uploaded jars via REST URL: https://github.com/apache/flink/pull/8543#discussion_r287931889 ## File path: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ## @@ -1061,8 +1064,12 @@ private static String getDefaultName() { * @return The execution environment of the context in which the program is executed. */ public static ExecutionEnvironment getExecutionEnvironment() { + return contextEnvironmentFactory == null ? - createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment(); + (contextEnvironmentFactoryThreadLocal.get() == null ? Review comment: Thanks for your review @klion26. Good catch and updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive URL: https://github.com/apache/flink/pull/8511#issuecomment-496361993 Hi @tillrohrmann , would you please take a look at this issue? I think it deserves to be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file URL: https://github.com/apache/flink/pull/8541#discussion_r287926431 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogFactory.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Catalog; + +import java.util.Map; + +/** + * A factory to create configured catalog instances based on string-based properties. See + * also {@link TableFactory} for more information. + */ +@PublicEvolving +public interface CatalogFactory extends TableFactory { Review comment: It's a little weird that CatalogFactory extends from TableFactory. Any thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side
[ https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849304#comment-16849304 ] Yun Gao commented on FLINK-12171: - After further analyze this problem, now I think we do not need to check the maximum allowed memory on TM side. For RM side, we compute the network memory size from the total memory size, there may be cases that the configured MIN and MAX is too large that the resulted network memory is larger than the total memory size, we need to check against that. However, on TM side, we do not know the total memory size, instead we only know the heap size. We can only deduce the total memory size by heap size + computed network memory, which is always larger than the computed network memory. Therefore, unless we ensure the total memory size is available on the TM side and we also compute the network memory size from the total memory size on TM side, we can not check the network memory size. According to the above analysis, I think we can first remove the comparison of the network memory size and heap memory size directly. This comparison is not right since the network memory is not part of the heap memory, and it may raise error when the configuration is in fact reasonable. > The network buffer memory size should not be checked against the heap size on > the TM side > - > > Key: FLINK-12171 > URL: https://issues.apache.org/jira/browse/FLINK-12171 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the > logic here. > >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > Currently when computing the network buffer memory size on the TM side in > _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or > _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), > the computed network buffer memory size is checked to be less than > `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the > maximum heap memory (namely -Xmx) . > > With the above process, when TM starts, -Xmx is computed in RM or in > _taskmanager.sh_ with (container memory - network buffer memory - managed > memory), thus the above checking implies that the heap memory of the TM must > be larger than the network memory, which seems to be not necessary. > > > Therefore, I think the network buffer memory size also need to be checked > against the total memory instead of the heap memory on the TM side: > # Checks that networkBufFraction < 1.0. > # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)). > # Compare the network buffer memory with the total memory. > This checking is also consistent with the similar one done on the RM side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lirui-apache commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables
lirui-apache commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables URL: https://github.com/apache/flink/pull/8522#discussion_r287919214 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.table.type.DecimalType; + +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; + +import java.math.BigDecimal; +import java.sql.Timestamp; + +/** + * Class used to serialize to and from raw hdfs file type. + * Highly inspired by HCatRecordSerDe (almost copied from this class)in hive-catalog-core. + */ +public class HiveRecordSerDe { + + /** +* Return underlying Java Object from an object-representation +* that is readable by a provided ObjectInspector. +*/ + public static Object serializeField(Object field, ObjectInspector fieldObjectInspector) Review comment: I think some methods need better naming. Usually *serialize* is to convert an object to a byte[]. It seems what the method does here is to convert a hive object to flink object? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file URL: https://github.com/apache/flink/pull/8541#discussion_r287918350 ## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java ## @@ -70,4 +79,24 @@ public void testMerging() throws Exception { assertTrue(merged.getExecution().isStreamingExecution()); assertEquals(16, merged.getExecution().getMaxParallelism()); } + + @Test + public void testDuplicateCatalog() { + exception.expect(SqlClientException.class); + exception.expectMessage("Cannot create catalog 'catalog2' because a catalog with this name is already registered."); + Environment env = new Environment(); + env.setCatalogs(Arrays.asList( + getCatalog("catalog1", "test"), + getCatalog("catalog2", "test"), + getCatalog("catalog2", "test"))); + } + + private static Map getCatalog(String name, String type) { Review comment: rename to createCatalog or instantiateCatalog? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode
godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode URL: https://github.com/apache/flink/pull/8520#discussion_r287917681 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, operandJ} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories} +import org.apache.calcite.rex.{RexShuttle, _} +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.sql.`type`.SqlTypeFamily +import org.apache.calcite.sql.fun.SqlCountAggFunction +import org.apache.calcite.tools.RelBuilderFactory + +import scala.collection.JavaConversions._ + +/** + * Planner rule that rewrites filter condition like: + * `(select count(*) from T) > 0` to `exists(select * from T)`, Review comment: The estimation for SEMI/ANTI join is very inaccurate,so we intend to do deterministic rewriting on SEMI/ANTI join. And we can put this rule to CBO after we improve estimation of SEMI/ANTI join. yes, we can do similarly rewriting for `exists(select * from T limit 1)` later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode
godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode URL: https://github.com/apache/flink/pull/8520#discussion_r287917681 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, operandJ} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories} +import org.apache.calcite.rex.{RexShuttle, _} +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.sql.`type`.SqlTypeFamily +import org.apache.calcite.sql.fun.SqlCountAggFunction +import org.apache.calcite.tools.RelBuilderFactory + +import scala.collection.JavaConversions._ + +/** + * Planner rule that rewrites filter condition like: + * `(select count(*) from T) > 0` to `exists(select * from T)`, Review comment: The estimation for SEMI/ANTI join is very inaccurate,so we intend to do deterministic rewriting on SEMI/ANTI join. And we can put this rule to CBO after we improve estimation of SEMI/ANTI join. yes, we can do similarly rewriting for `exists(select * from T limit 1)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287917746 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) + .withDescription("The socket timeout in milliseconds for the blob client."); + + /** +* The connection timeout in milliseconds for the blob client. +*/ + public static final ConfigOption CONNECT_TIMEOUT = + key("blob.client.connect.timeout") + .defaultValue(120_000) Review comment: Please look at my above comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on issue #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on issue #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#issuecomment-496350510 Thanks for the reviews @tillrohrmann . I have update the code, please review it again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287915194 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose default value is also `120s`. Let we keep connection/socket timeouts consistent with it? @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287915194 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose default value is set to `120s`. Let we keep connection/socket timeouts consistent with it? @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12638) Expose max parallelism via the REST API and via the Web UI
Sean Bollin created FLINK-12638: --- Summary: Expose max parallelism via the REST API and via the Web UI Key: FLINK-12638 URL: https://issues.apache.org/jira/browse/FLINK-12638 Project: Flink Issue Type: Improvement Reporter: Sean Bollin Currently there is no way to view what max parallelism is set to. Let's add max parallelism to the REST API and the Web UI so you can see what it is set to. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287915194 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose default value is set to `120s`, and let we keep connection/socket timeouts consistent with it. What do you think? @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode
godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode URL: https://github.com/apache/flink/pull/8520#discussion_r287914800 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Join, JoinRelType, Values} + +object FlinkPruneEmptyRules { Review comment: `PruneEmptyRules` in Calcite contains more than one rules, I intend to keep calcite style for this rule. Maybe later, we need to copy other rules from `PruneEmptyRules` to this file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate
godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate URL: https://github.com/apache/flink/pull/8527#discussion_r287913290 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkAggregateJoinTransposeRule.java ## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical; + +import org.apache.flink.table.plan.util.AggregateUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalSnapshot; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlSplittableAggFunction; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.Bug; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.Mappings; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +/** + * This rule is copied from Calcite's {@link org.apache.calcite.rel.rules.AggregateJoinTransposeRule}. + * Modification: + * - Do not match TemporalTableScan since it means that it is a dimension table scan currently. Review comment: this rule is already in CBO, and CBO does not handle the unspported case (lookup table source doesn't support aggregate). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate
godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate URL: https://github.com/apache/flink/pull/8527#discussion_r287912472 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/AggregateCalcMergeRule.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.AggregateProjectMergeRule; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate} + * on top of a {@link org.apache.calcite.rel.core.Calc} and if possible + * aggregate through the calc or removes the calc. + * + * This is only possible when no condition in calc and the grouping expressions and arguments to Review comment: `AggregateProjectMergeRule` already exists and is in our rule set. The original intention of introducing this rule is to solve the hack in `RelDecorrelator` for tpch query20, however current cost model can not find best plan, so this rule has not been add to our rule set. we can move this rule from this pr, and introduce it when needed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file URL: https://github.com/apache/flink/pull/8541#discussion_r287912450 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ## @@ -227,6 +241,12 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine } } + private Catalog createCatalog(String name, Map catalogProperties, ClassLoader classLoader) { + final CatalogFactory factory = + TableFactoryService.find(CatalogFactory.class, catalogProperties, classLoader); Review comment: Shall we rename the factory service, coz we are here creating catalogs rather than tables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces URL: https://github.com/apache/flink/pull/8476#discussion_r287910131 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; + +import java.io.Closeable; + +/** + * Basic interface for inputs of stream operators. + */ +@Internal +public interface Input extends AsyncDataInput, Closeable { Review comment: > @sunhaibotb, are you fine with renaming those renames? (and fyi, since renaming will affect a wip `StreamSelectableTwoInputStreamProcessor` PR. It's fine to me. @pnowojski @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287909365 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Collections; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Result containing the tasks to restart upon a task failure. + * Also contains the reason if the failure is not recoverable(non-recoverable + * failure type or restarting suppressed by restart strategy). + */ +public class FailureHandlingResult { + + /** Task vertices to restart to recover from the failure. */ + private final Set verticesToRestart; + + /** Delay before the restarting can be conducted. */ + private final long restartDelayMS; + + /** Reason why the failure is not recoverable. */ + private final Throwable error; + + /** +* Creates a result of a set of tasks to restart to recover from the failure. +* +* @param verticesToRestart containing task vertices to restart to recover from the failure +* @param restartDelayMS indicate a delay before conducting the restart +*/ + private FailureHandlingResult(Set verticesToRestart, long restartDelayMS) { + checkState(restartDelayMS >= 0); + + this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart)); + this.restartDelayMS = restartDelayMS; + this.error = null; + } + + /** +* Creates a result that the failure is not recoverable and no restarting should be conducted. +* +* @param error reason why the failure is not recoverable +*/ + private FailureHandlingResult(Throwable error) { + this.verticesToRestart = null; + this.restartDelayMS = -1; + this.error = checkNotNull(error); + } + + /** +* Returns the tasks to restart. +* +* @return the tasks to restart +*/ + public Set getVerticesToRestart() { + if (canRestart()) { + return verticesToRestart; + } else { + throw new IllegalStateException("Cannot get vertices to restart when the restarting is suppressed."); + } + } + + /** +* Returns the delay before the restarting. +* +* @return the delay before the restarting +*/ + public long getRestartDelayMS() { + if (canRestart()) { + return restartDelayMS; + } else { + throw new IllegalStateException("Cannot get restart delay when the restarting is suppressed."); + } + } + + /** +* Returns whether the restarting can be conducted. +* +* @return whether the restarting can be conducted +*/ + public boolean canRestart() { + return error == null; + } + + /** +* Returns reason why the restarting cannot be conducted. +* +* @return reason why the restarting cannot be conducted +*/ + public Throwable getError() { + return error; Review comment: You are right. I will keep them aligned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8553: [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog
xuefuz commented on issue #8553: [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog URL: https://github.com/apache/flink/pull/8553#issuecomment-496340490 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tweise commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord
tweise commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord URL: https://github.com/apache/flink/pull/8535#issuecomment-496340207 @aljoscha glad to see this change, we may be interested to use the headers with 0.11 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287909399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link FailureHandlingResult}. + */ +public class FailureHandlingResultTest extends TestLogger { + + /** +* Tests normal FailureHandlingResult. +*/ + @Test + public void testNormalFailureHandlingResult() throws Exception { + // create a normal FailureHandlingResult + Set tasks = new HashSet<>(); Review comment: My fault. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287909365 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Collections; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Result containing the tasks to restart upon a task failure. + * Also contains the reason if the failure is not recoverable(non-recoverable + * failure type or restarting suppressed by restart strategy). + */ +public class FailureHandlingResult { + + /** Task vertices to restart to recover from the failure. */ + private final Set verticesToRestart; + + /** Delay before the restarting can be conducted. */ + private final long restartDelayMS; + + /** Reason why the failure is not recoverable. */ + private final Throwable error; + + /** +* Creates a result of a set of tasks to restart to recover from the failure. +* +* @param verticesToRestart containing task vertices to restart to recover from the failure +* @param restartDelayMS indicate a delay before conducting the restart +*/ + private FailureHandlingResult(Set verticesToRestart, long restartDelayMS) { + checkState(restartDelayMS >= 0); + + this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart)); + this.restartDelayMS = restartDelayMS; + this.error = null; + } + + /** +* Creates a result that the failure is not recoverable and no restarting should be conducted. +* +* @param error reason why the failure is not recoverable +*/ + private FailureHandlingResult(Throwable error) { + this.verticesToRestart = null; + this.restartDelayMS = -1; + this.error = checkNotNull(error); + } + + /** +* Returns the tasks to restart. +* +* @return the tasks to restart +*/ + public Set getVerticesToRestart() { + if (canRestart()) { + return verticesToRestart; + } else { + throw new IllegalStateException("Cannot get vertices to restart when the restarting is suppressed."); + } + } + + /** +* Returns the delay before the restarting. +* +* @return the delay before the restarting +*/ + public long getRestartDelayMS() { + if (canRestart()) { + return restartDelayMS; + } else { + throw new IllegalStateException("Cannot get restart delay when the restarting is suppressed."); + } + } + + /** +* Returns whether the restarting can be conducted. +* +* @return whether the restarting can be conducted +*/ + public boolean canRestart() { + return error == null; + } + + /** +* Returns reason why the restarting cannot be conducted. +* +* @return reason why the restarting cannot be conducted +*/ + public Throwable getError() { + return error; Review comment: You are right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287908868 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.throwable.ThrowableClassifier; +import org.apache.flink.runtime.throwable.ThrowableType; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This handler deals with task failures to return a {@link FailureHandlingResult} which contains tasks + * to restart to recover from failures. + */ +public class ExecutionFailureHandler { + + /** Strategy to judge which tasks should be restarted. */ + private final FailoverStrategy failoverStrategy; + + /** Strategy to judge whether and when a restarting should be done. */ + private final RestartBackoffTimeStrategy restartBackoffTimeStrategy; + + /** +* Creates the handler to deal with task failures. +* +* @param failoverStrategy helps to decide tasks to restart on task failures +* @param restartBackoffTimeStrategy helps to decide whether to restart failed tasks and the restarting delay +*/ + public ExecutionFailureHandler( + FailoverStrategy failoverStrategy, + RestartBackoffTimeStrategy restartBackoffTimeStrategy) { + + this.failoverStrategy = checkNotNull(failoverStrategy); + this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy); + } + + /** +* Return result of failure handling. Can be a set of task vertices to restart +* and a delay of the restarting. Or that the failure is not recoverable and the reason for it. +* +* @param failedTask is the ID of the failed task vertex +* @param cause of the task failure +* @return result of the failure handling +*/ + public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) { + if (ThrowableClassifier.getThrowableType(cause) == ThrowableType.NonRecoverableError) { Review comment: Yes the design doc proposed to check the error type in the restart strategy. But that requires every strategy implementation to do the same check. Therefore I think it's better we let the restart strategy do it's dedicated check only. Besides, I'm also thinking that whether we can have a composed restart strategy, which can do multiple checks for failing max limit, failing rate, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#issuecomment-496338752 @GJL please have a look at this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849259#comment-16849259 ] Mike Kaplinskiy edited comment on FLINK-12620 at 5/28/19 12:38 AM: --- Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] Somewhere between those 2 threads is a class initialization deadlock. My hacky fix that I tried locally looks like this: {code} diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 644289133b..dc722c1db4 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -492,7 +492,7 @@ public final class InstantiationUtil { } } - public static T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { + public static synchronized T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { byte[] bytes = config.getBytes(key, null); if (bytes == null) { return null; {code} That said, I'm not sure that it's the proper fix. was (Author: mikekap): Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] Somewhere between those 3 threads is a class initialization deadlock. My hacky fix that I tried locally looks like this: {code} diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 644289133b..dc722c1db4 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -492,7 +492,7 @@ public final class InstantiationUtil { } } - public static T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { + public static synchronized T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { byte[] bytes = config.getBytes(key, null); if (bytes == null) { return null; {code} That said, I'm not sure that it's the proper fix. > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849259#comment-16849259 ] Mike Kaplinskiy commented on FLINK-12620: - Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] Somewhere between those 3 threads is a class initialization deadlock. My hacky fix that I tried locally looks like this: {code} diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 644289133b..dc722c1db4 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -492,7 +492,7 @@ public final class InstantiationUtil { } } - public static T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { + public static synchronized T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException { byte[] bytes = config.getBytes(key, null); if (bytes == null) { return null; {code} That said, I'm not sure that it's the proper fix. > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12620: Attachment: jstack_snippet.txt > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287807783 ## File path: docs/dev/stream/state/state.md ## @@ -483,11 +513,12 @@ val ttlConfig = StateTtlConfig RocksDB compaction filter will query current timestamp, used to check expiration, from Flink every time -after processing certain number of state entries. This number is 1000 by default. +after processing certain number of state entries. You can optionally change it and pass a custom value to `StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)` method. Updating the timestamp more often can improve cleanup speed but it decreases compaction performance because it uses JNI call from native code. +If you enable the default background cleanup then this strategy will be activated for RocksDB backend with 1000 number of state entries every time after processing. Review comment: `this strategy will be activated for RocksDB backend and the current timestamp will be queried each time 1000 entries have been processed.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287804877 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -376,39 +398,42 @@ public StateTtlConfig build() { private static final long serialVersionUID = 1373998465131443873L; } - final EnumMap strategies = new EnumMap<>(Strategies.class); - - public void activate(Strategies strategy) { - activate(strategy, EMPTY_STRATEGY); - } - - public void activate(Strategies strategy, CleanupStrategy config) { - strategies.put(strategy, config); + public CleanupStrategies(EnumMap strategies, boolean isCleanupInBackground) { Review comment: can be private This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287878198 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -322,11 +329,22 @@ public Builder cleanupInRocksdbCompactFilter() { */ @Nonnull public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) { - cleanupStrategies.activate(CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER, + strategies.put(CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER, new RocksdbCompactFilterCleanupStrategy(queryTimeAfterNumEntries)); return this; } + /** +* Enable cleanup of expired state in background. +* +* Depending on actually used backend, the corresponding cleanup will kick in if supported. +*/ + @Nonnull + public Builder cleanupInBackground() { Review comment: I suggest we add a simple unit test for this method where `StateTtlConfig` is built like in the doc example with `cleanupInBackground` and test checks that `StateTtlConfig.getCleanupStrategies().isCleanupInBackground()` is `true`, `getIncrementalCleanupStrategy()` is not null, `getIncrementalCleanupStrategy()getCleanupSize()` is 5 etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287809610 ## File path: docs/dev/stream/state/state.md ## @@ -483,11 +513,12 @@ val ttlConfig = StateTtlConfig RocksDB compaction filter will query current timestamp, used to check expiration, from Flink every time -after processing certain number of state entries. This number is 1000 by default. +after processing certain number of state entries. You can optionally change it and pass a custom value to Review comment: let's remove `optionally` and put `1000` into `cleanupInRocksdbCompactFilter(1000)` in java and scala examples as we are deprecating the parameterless version of `cleanupInRocksdbCompactFilter`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API
azagrebin commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API URL: https://github.com/apache/flink/pull/8485#issuecomment-496287266 True, thanks for noticing it @zhijiangW, it was not an intent. I will change it to create group only once. It can be called e.g. `NetworkInput`/`NetworkOutput`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10121) Introduce methods to remove registered operator states
[ https://issues.apache.org/jira/browse/FLINK-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849087#comment-16849087 ] aitozi commented on FLINK-10121: Hi,[~srichter] Is this still ongoing work ? I think it's a valuable feature, If it has not been started, I want to look into this. > Introduce methods to remove registered operator states > -- > > Key: FLINK-10121 > URL: https://issues.apache.org/jira/browse/FLINK-10121 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > User can register new operator states but never remove a registered state. > This is particularly problematic with expensive states or states that we > register only to provide backwards compatibility. We can also consider the > same for keyed state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10098) Programmatically select timer storage backend
[ https://issues.apache.org/jira/browse/FLINK-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849082#comment-16849082 ] aitozi commented on FLINK-10098: I will work on this issue to solve the problem > Programmatically select timer storage backend > - > > Key: FLINK-10098 > URL: https://issues.apache.org/jira/browse/FLINK-10098 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / State Backends >Affects Versions: 1.6.0, 1.7.0 >Reporter: Elias Levy >Assignee: aitozi >Priority: Major > > FLINK-9486 introduced timer storage on the RocksDB storage backend. Right > now it is only possible to configure RocksDB as the storage for timers by > settings the {{state.backend.rocksdb.timer-service.factory}} value in the > configuration file for Flink. > As the state storage backend can be programmatically selected by by jobs via > {{env.setStateBackend(...)}}, the timer backend should also be configurable > programmatically. > Different jobs should be able to store their timers in different storage > backends. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10098) Programmatically select timer storage backend
[ https://issues.apache.org/jira/browse/FLINK-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi reassigned FLINK-10098: -- Assignee: aitozi > Programmatically select timer storage backend > - > > Key: FLINK-10098 > URL: https://issues.apache.org/jira/browse/FLINK-10098 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / State Backends >Affects Versions: 1.6.0, 1.7.0 >Reporter: Elias Levy >Assignee: aitozi >Priority: Major > > FLINK-9486 introduced timer storage on the RocksDB storage backend. Right > now it is only possible to configure RocksDB as the storage for timers by > settings the {{state.backend.rocksdb.timer-service.factory}} value in the > configuration file for Flink. > As the state storage backend can be programmatically selected by by jobs via > {{env.setStateBackend(...)}}, the timer backend should also be configurable > programmatically. > Different jobs should be able to store their timers in different storage > backends. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken edited comment on FLINK-12302 at 5/27/19 4:53 PM: - [~gjy], from another side, we can analysis this issue only from the code logic. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} *Longrunning Applications's FinalStatus* *!image-2019-05-28-00-46-49-740.png!* was (Author: lamber-ken): [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} *Longrunning Applications's FinalStatus* *!image-2019-05-28-00-46-49-740.png!* > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix
[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849080#comment-16849080 ] lamber-ken commented on FLINK-12302: So, when the unexcepted scene happens, it displays wrong +finalStatus+, because it's not running. The FinalStatus should be FAILED not UNDEFINED. !image-2019-05-28-00-50-13-500.png! > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, image-2019-05-28-00-46-49-740.png, > image-2019-05-28-00-50-13-500.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: image-2019-05-28-00-50-13-500.png > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, image-2019-05-28-00-46-49-740.png, > image-2019-05-28-00-50-13-500.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken edited comment on FLINK-12302 at 5/27/19 4:47 PM: - [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} *Longrunning Applications's FinalStatus* *!image-2019-05-28-00-46-49-740.png!* was (Author: lamber-ken): [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, >
[jira] [Assigned] (FLINK-11634) Translate "State Backends" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jeremy huang reassigned FLINK-11634: Assignee: jeremy huang > Translate "State Backends" page into Chinese > > > Key: FLINK-11634 > URL: https://issues.apache.org/jira/browse/FLINK-11634 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: jeremy huang >Priority: Major > > doc locates in flink/docs/dev/stream/state/state_backens.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken edited comment on FLINK-12302 at 5/27/19 4:43 PM: - [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} was (Author: lamber-ken): [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} ** *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, >
[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken edited comment on FLINK-12302 at 5/27/19 4:42 PM: - [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the +UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by +YarnResourceManager#getYarnStatus.+ But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *YarnResourceManager#getYarnStatus* {code:java} private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { if (status == null) { return FinalApplicationStatus.UNDEFINED; } else { switch (status) { case SUCCEEDED: return FinalApplicationStatus.SUCCEEDED; case FAILED: return FinalApplicationStatus.FAILED; case CANCELED: return FinalApplicationStatus.KILLED; default: return FinalApplicationStatus.UNDEFINED; } } } {code} ** *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} was (Author: lamber-ken): [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state. But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken edited comment on FLINK-12302 at 5/27/19 4:34 PM: - [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state. But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *Hadoop Application Status* [FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32] {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} was (Author: lamber-ken): [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state. But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *Hadoop Application Status https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32* {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071 ] lamber-ken commented on FLINK-12302: [~gjy], from another side, we can analysis this issue only from the code. When some scene happends and the call the +MiniDispatcher#jobNotFinished+ method, it means the flink job terminate unexpectedly, so it will notify the RM to kill the yarn application with +ApplicationStatus.UNKNOWN+ state. But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet finished. *MiniDispatcher#jobNotFinished* {code:java} @Override protected void jobNotFinished(JobID jobId) { super.jobNotFinished(jobId); // shut down since we have done our job jobTerminationFuture.complete(ApplicationStatus.UNKNOWN); } {code} *Hadoop Application Status https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32* {code:java} /** * Enumeration of various final states of an Application. */ @Public @Stable public enum FinalApplicationStatus { /** Undefined state when either the application has not yet finished */ UNDEFINED, /** Application which finished successfully. */ SUCCEEDED, /** Application which failed. */ FAILED, /** Application which was terminated by a user or admin. */ KILLED } {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-496260983 Have added the three metric for the credit based mode, please take a look when you are free @zhijiangW @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
flinkbot edited a comment on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#issuecomment-495066880 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @jgrier * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @jgrier * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @jgrier * ✅ 5. Overall code [quality] is good. - Approved by @jgrier Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jgrier commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
jgrier commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#issuecomment-496260766 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#discussion_r287840233 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Emitter that handles event time synchronization between producer threads. + * + * Records are organized into per producer queues that will block when capacity is exhausted. + * + * Records are emitted by selecting the oldest available element of all producer queues, + * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval. + * + * @param + */ +public abstract class RecordEmitter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class); + + /** +* The default capacity of a single queue. +* +* Larger queue size can lead to higher throughput, but also to +* very high heap space consumption, depending on the size of elements. +* +* Note that this is difficult to tune, because it does not take into account +* the size of individual objects. +*/ + public static final int DEFAULT_QUEUE_CAPACITY = 100; + + private final int queueCapacity; + private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Boolean> emptyQueues = new ConcurrentHashMap<>(); + private final PriorityQueue> heads = new PriorityQueue<>(this::compareHeadElement); + private volatile boolean running = true; + private volatile long maxEmitTimestamp = Long.MAX_VALUE; + private long maxLookaheadMillis = 60 * 1000; // one minute + private long idleSleepMillis = 100; + private final Object condition = new Object(); + + public RecordEmitter(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) { + return Long.compare(left.headTimestamp, right.headTimestamp); + } + + /** +* Accepts records from readers. +* +* @param +*/ + public interface RecordQueue { + void put(T record) throws InterruptedException; + + int getQueueId(); + + int getSize(); + + T peek(); + } + + private class AsyncRecordQueue implements RecordQueue { + private final ArrayBlockingQueue queue; + private final int queueId; + long headTimestamp; + + private AsyncRecordQueue(int queueId) { + super(); + this.queue = new ArrayBlockingQueue<>(queueCapacity); + this.queueId = queueId; + this.headTimestamp = Long.MAX_VALUE; + } + + public void put(T record) throws InterruptedException { + queue.put(record); + // TODO: not pretty having this here + synchronized (condition) { + condition.notify(); + } + } + + public int getQueueId() { + return queueId; + } + + public int getSize() { + return queue.size(); + } + + public T peek() { + return queue.peek(); + } + + } + + /** +* The queue for the given producer (i.e. Kinesis shard consumer thread). +* +
[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#discussion_r287840412 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Emitter that handles event time synchronization between producer threads. + * + * Records are organized into per producer queues that will block when capacity is exhausted. + * + * Records are emitted by selecting the oldest available element of all producer queues, + * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval. + * + * @param + */ +public abstract class RecordEmitter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class); + + /** +* The default capacity of a single queue. +* +* Larger queue size can lead to higher throughput, but also to +* very high heap space consumption, depending on the size of elements. +* +* Note that this is difficult to tune, because it does not take into account +* the size of individual objects. +*/ + public static final int DEFAULT_QUEUE_CAPACITY = 100; + + private final int queueCapacity; + private final ConcurrentHashMap> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Boolean> emptyQueues = new ConcurrentHashMap<>(); + private final PriorityQueue> heads = new PriorityQueue<>(this::compareHeadElement); + private volatile boolean running = true; + private volatile long maxEmitTimestamp = Long.MAX_VALUE; + private long maxLookaheadMillis = 60 * 1000; // one minute + private long idleSleepMillis = 100; + private final Object condition = new Object(); + + public RecordEmitter(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) { + return Long.compare(left.headTimestamp, right.headTimestamp); + } + + /** +* Accepts records from readers. +* +* @param +*/ + public interface RecordQueue { + void put(T record) throws InterruptedException; + + int getQueueId(); + + int getSize(); + + T peek(); + } + + private class AsyncRecordQueue implements RecordQueue { + private final ArrayBlockingQueue queue; + private final int queueId; + long headTimestamp; + + private AsyncRecordQueue(int queueId) { + super(); + this.queue = new ArrayBlockingQueue<>(queueCapacity); + this.queueId = queueId; + this.headTimestamp = Long.MAX_VALUE; + } + + public void put(T record) throws InterruptedException { + queue.put(record); + // TODO: not pretty having this here + synchronized (condition) { + condition.notify(); + } + } + + public int getQueueId() { + return queueId; + } + + public int getSize() { + return queue.size(); + } + + public T peek() { + return queue.peek(); + } + + } + + /** +* The queue for the given producer (i.e. Kinesis shard consumer thread). +* +
[jira] [Updated] (FLINK-12637) Add floatingBufferUsage and exclusiveBufferUsage for credit based mode
[ https://issues.apache.org/jira/browse/FLINK-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-12637: --- Description: Described [here|https://github.com/apache/flink/pull/8455#issuecomment-496077999] (was: Described [here](https://github.com/apache/flink/pull/8455#issuecomment-496077999)) > Add floatingBufferUsage and exclusiveBufferUsage for credit based mode > -- > > Key: FLINK-12637 > URL: https://issues.apache.org/jira/browse/FLINK-12637 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Network >Affects Versions: 1.9.0 >Reporter: aitozi >Assignee: aitozi >Priority: Minor > > Described > [here|https://github.com/apache/flink/pull/8455#issuecomment-496077999] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12637) Add floatingBufferUsage and exclusiveBufferUsage for credit based mode
aitozi created FLINK-12637: -- Summary: Add floatingBufferUsage and exclusiveBufferUsage for credit based mode Key: FLINK-12637 URL: https://issues.apache.org/jira/browse/FLINK-12637 Project: Flink Issue Type: Improvement Components: Runtime / Metrics, Runtime / Network Affects Versions: 1.9.0 Reporter: aitozi Assignee: aitozi Described [here](https://github.com/apache/flink/pull/8455#issuecomment-496077999) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Comment: was deleted (was: [~gjy], hi, can you show me your +flink-conf.yaml+ file? thanks) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849038#comment-16849038 ] lamber-ken commented on FLINK-12302: [~gjy], here my env files. 1,test jars --> [^test.jar] 2,flink-1.8.0 3,flink-conf.yaml --> [^flink-conf.yaml] 4,the first jobmanager.log --> [^jobmanager-1.log] 5,the second jobmanager.log --> [^jobmanager-2.log] you must wait the job reach the max attemp times, then you can kill the am. from the second jobmanager.log, we will see {code:java} Job 0cac7407733eb34396cd5e919631d4ff was not finished by JobManager. {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: jobmanager-1.log > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: jobmanager-2.log > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, > jobmanager-2.log, screenshot-1.png, screenshot-2.png, > spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: flink-conf.yaml > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: (was: jobmanager-1.log) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: test.jar > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: (was: flink-conf.yaml) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: (was: jobmanager-2.log) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: (was: test.jar) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, > screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, > spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: test.jar jobmanager-2.log jobmanager-1.log flink-conf.yaml > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, > screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, > spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: (was: test.jar) > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, > screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, > spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lamber-ken updated FLINK-12302: --- Attachment: test.jar > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, > test.jar > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849010#comment-16849010 ] lamber-ken commented on FLINK-12302: [~gjy], hi, can you show me your +flink-conf.yaml+ file? thanks > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API
zhijiangW commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API URL: https://github.com/apache/flink/pull/8485#issuecomment-496231763 Thanks for the replies @zentol . But in this PR `parentGroup.addGroup("Network")` is called twice in `NetworkEnvironment#createResultPartitionWriters/InputGates`. So we should change to create it still in task class and then pass it into `createResultPartitionWriters/InputGates` separately? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8555: [FLINK-12254][table-common] More preparation for using the new type system
flinkbot commented on issue #8555: [FLINK-12254][table-common] More preparation for using the new type system URL: https://github.com/apache/flink/pull/8555#issuecomment-496231212 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr opened a new pull request #8555: [FLINK-12254][table-common] More preparation for using the new type system
twalthr opened a new pull request #8555: [FLINK-12254][table-common] More preparation for using the new type system URL: https://github.com/apache/flink/pull/8555 ## What is the purpose of the change This PR contains another set of utility classes for enabling the new type system. The most important part is the ValueDataTypeConverter. It basically allows converting literals to DataType. By looking into the actual value instead of just the class, it enabled supporting all sorts of time classes as well as Java's BigDecimal with variable precision and scale. It also contains one important change regarding string literals that improve SQL standard compliance. ## Brief change log See commit messages. ## Verifying this change See `ValueDataTypeConverterTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848987#comment-16848987 ] Gary Yao commented on FLINK-12302: -- [~lamber-ken] Thanks for your example. However, I am still not able to reproduce this (emr-5.23.0, Amazon Hadoop 2.8.5). I manually killed the AM after submission, and triggered another application attempt. Find attached the logs: [^jobmanager-05-27.log] Maybe you can attach logs as well? Submission command: {code} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster -d -c comTestDemo ../testjob-1.0-SNAPSHOT.jar {code} > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
1u0 commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287809858 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityListener.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** + * Interface defining couple of essential methods for listening on data availability using + * {@link CompletableFuture}. For usage check out for example {@link AsyncDataInput}. + */ +@Internal +public interface AvailabilityListener { + /** +* Constant that allows to avoid volatile checks {@link CompletableFuture#isDone()}. Check +* {@link #isAvailable()} for more explanation. +*/ + CompletableFuture AVAILABLE = CompletableFuture.completedFuture(null); + + /** +* @return true if is finished and for example end of input was reached, false otherwise. +*/ + boolean isFinished(); + + /** +* Check if this instance is available for further processing. +* +* When hot looping to avoid volatile access in {@link CompletableFuture#isDone()} user of +* this method should do the following check: +* +* {@code +* AvailabilityListener input = ...; +* if (input.isAvailable() == AvailabilityListener.AVAILABLE || input.isAvailable().isDone()) { +* // do something; +* } +* } +* +* +* @return a future that is completed if there are more records available. If there are more +* records available immediately, {@link #AVAILABLE} should be returned. Previously returned +* not completed futures should become completed once there is more input available or if +* the input {@link #isFinished()}. +*/ + CompletableFuture isAvailable(); Review comment: Maybe you can simplify the contract of this interface, by having only one method that returns 3 possible states of an underlying data stream? In nutshell, implementations of this interface have three situations: * stream has finished; * stream has not finished, and there is an immediate data ready to be processed; * stream has not finished, but the caller needs to wait when data is ready. One way, you can apply the same trick as with `CompletableFuture AVAILABLE = CompletableFuture.completedFuture(null);`. Basically, having pre-defined different constant dummy `CompletableFuture` that would be marker of `isFinished`: ``` public interface AvailabilityListener { CompletableFuture FINISHED = CompletableFuture.completedFuture(null); CompletableFuture AVAILABLE = CompletableFuture.completedFuture(null); CompletableFuture getStatusFuture(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished
[ https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-12302: - Attachment: jobmanager-05-27.log > Fixed the wrong finalStatus of yarn application when application finished > - > > Key: FLINK-12302 > URL: https://issues.apache.org/jira/browse/FLINK-12302 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: fix-bad-finalStatus.patch, > image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, > screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271 > > Time Spent: 10m > Remaining Estimate: 0h > > flink job(flink-1.6.3) failed in per-job yarn cluste mode, the > resourcemanager of yarn rerun the job. > when the job failed again, the application while finish, but the finalStatus > is +UNDEFINED,+ It's better to show state +FAILED+ > !image-2019-04-23-19-56-49-933.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup
zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup URL: https://github.com/apache/flink/pull/8545#discussion_r287806538 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -185,6 +185,14 @@ "faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 " + "disables the metric fetching completely."); + /** +* Whether the host name in task manager metrics should be fully qualified domain name. +*/ + public static final ConfigOption METRIC_FULL_HOST_NAME = + key("metrics.tm.full-hostname") Review comment: I'd rather introduce a new variable `>tm_fqdn<` then resorting to config options. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup
zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup URL: https://github.com/apache/flink/pull/8545#discussion_r287806538 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -185,6 +185,14 @@ "faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 " + "disables the metric fetching completely."); + /** +* Whether the host name in task manager metrics should be fully qualified domain name. +*/ + public static final ConfigOption METRIC_FULL_HOST_NAME = + key("metrics.tm.full-hostname") Review comment: I'd rather introduce a new variable `` then resorting to config options. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup
zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup URL: https://github.com/apache/flink/pull/8545#discussion_r287806224 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java ## @@ -202,7 +214,11 @@ public static String getHostName(InetAddress inetAddress) { LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. " + "Local input split assignment (such as for HDFS files) may be impacted.", inetAddress.getHostAddress()); } else { - hostName = NetUtils.getHostnameFromFQDN(fqdnHostName); + if (useFullHostName) { + hostName = fqdnHostName; Review comment: this case should most certainly not be handled in this method. Have the TaskManagerRunner call `getFqdnHostName` instead (after making it public). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11283) Add keyed CoProcessFunction that allows accessing key
[ https://issues.apache.org/jira/browse/FLINK-11283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11283: - Issue Type: New Feature (was: Improvement) > Add keyed CoProcessFunction that allows accessing key > - > > Key: FLINK-11283 > URL: https://issues.apache.org/jira/browse/FLINK-11283 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently, we can access the key when using {{KeyedProcessFunction}} . > Simillar functionality would be very useful when processing connected keyed > stream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] pnowojski commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation
pnowojski commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation URL: https://github.com/apache/flink/pull/8495#issuecomment-496224208 LGTM % renaming, one question and if Chesney is fine with extending the testing time (I would keep the original count of tests) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8548: [FLINK-6962] [table] Add a create table SQL DDL
twalthr commented on a change in pull request #8548: [FLINK-6962] [table] Add a create table SQL DDL URL: https://github.com/apache/flink/pull/8548#discussion_r287803308 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -0,0 +1,347 @@ +<#-- +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to you under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +--> + +void TableColumn(TableCreationContext context) : +{ +} +{ +( +TableColumn2(context.columnList) +| +context.watermark = Watermark() +| +context.primaryKeyList = PrimaryKey() +| +UniqueKey(context.uniqueKeysList) +| +ComputedColumn(context) +) +} + +void ComputedColumn(TableCreationContext context) : +{ +SqlNode identifier; +SqlNode expr; +boolean hidden = false; +SqlParserPos pos; +} +{ +identifier = SimpleIdentifier() {pos = getPos();} + +expr = Expression(ExprContext.ACCEPT_SUB_QUERY) { +expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier); +context.columnList.add(expr); +} +} + +void TableColumn2(List list) : +{ +SqlParserPos pos; +SqlIdentifier name; +SqlDataTypeSpec type; +SqlCharStringLiteral comment = null; +boolean isHeader = false; +} +{ +name = SimpleIdentifier() +type = DataType() +( + { type = type.withNullable(true); } +| + { type = type.withNullable(false); } +| +{ type = type.withNullable(true); } +) +[ { isHeader = true; } ] +[ { +String p = SqlParserUtil.parseString(token.image); +comment = SqlLiteral.createCharString(p, getPos()); +}] +{ +SqlTableColumn tableColumn = new SqlTableColumn(name, type, comment, getPos()); +tableColumn.setHeader(isHeader); +list.add(tableColumn); +} +} + +SqlNodeList PrimaryKey() : +{ +List pkList = new ArrayList(); + +SqlParserPos pos; +SqlIdentifier columnName; +} +{ + { pos = getPos(); } +columnName = SimpleIdentifier() { pkList.add(columnName); } +( columnName = SimpleIdentifier() { pkList.add(columnName); })* + +{ +return new SqlNodeList(pkList, pos.plus(getPos())); +} +} + +void UniqueKey(List list) : +{ +List ukList = new ArrayList(); +SqlParserPos pos; +SqlIdentifier columnName; +} +{ + { pos = getPos(); } +columnName = SimpleIdentifier() { ukList.add(columnName); } +( columnName = SimpleIdentifier() { ukList.add(columnName); })* + +{ +SqlNodeList uk = new SqlNodeList(ukList, pos.plus(getPos())); +list.add(uk); +} +} + +SqlWatermark Watermark() : +{ +SqlIdentifier watermarkName = null; +SqlIdentifier columnName; +SqlWatermarkStrategy strategy; +int delayDef = -1; +SqlTimeUnit timeUnit = null; +} +{ + +[watermarkName = SimpleIdentifier()] + +columnName = SimpleIdentifier() + +( + +{ +strategy = SqlWatermarkStrategy.BOUNDED_WITH_DELAY; +delayDef = UnsignedIntLiteral(); +} +( + { timeUnit = SqlTimeUnit.DAY; } +|{ timeUnit = SqlTimeUnit.HOUR; } +|{ timeUnit = SqlTimeUnit.MINUTE; } +|{ timeUnit = SqlTimeUnit.SECOND; } +|{ timeUnit = SqlTimeUnit.MILLISECOND; } +) +|{ strategy = SqlWatermarkStrategy.ASCENDING; } +|{ strategy = SqlWatermarkStrategy.FROM_SOURCE; } +) +{ +return new SqlWatermark( +watermarkName, +columnName, +strategy, +delayDef, +timeUnit, +getPos()); +} +} + +SqlNode PropertyValue() : +{ +SqlIdentifier key; +SqlNode value; +SqlParserPos pos; +} +{ +key = CompoundIdentifier() +{ pos = getPos(); } + value = StringLiteral() +{ +return new SqlProperty(key, value, getPos()); +} +} + +SqlNode SqlCreateTable() : +{ +final SqlParserPos startPos; +SqlIdentifier tableName; +String tableType = null; +SqlNodeList primaryKeyList = null; +List uniqueKeysList = null; +
[GitHub] [flink] pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation
pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation URL: https://github.com/apache/flink/pull/8495#discussion_r287803156 ## File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh ## @@ -92,34 +88,28 @@ do sleep 2 done -CLUSTER_STARTED=1 -for (( i = 0; i < $CLUSTER_SETUP_RETRIES; i++ )) -do -if start_hadoop_cluster; then - echo "Cluster started successfully." - CLUSTER_STARTED=0 - break #continue test, cluster set up succeeded -fi - -echo "ERROR: Could not start hadoop cluster. Retrying..." -docker-compose -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml down -done - -if [[ ${CLUSTER_STARTED} -ne 0 ]]; then +if ! retry_times $CLUSTER_SETUP_RETRIES 0 start_hadoop_cluster; then echo "ERROR: Could not start hadoop cluster. Aborting..." exit 1 fi +mkdir -p $FLINK_TARBALL_DIR +tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) . Review comment: > I wasn't (and I'm still not) sure if all of those refactorings in the first commit are related to one another or if they are 3 independent things Bumping the question :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation
pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation URL: https://github.com/apache/flink/pull/8495#discussion_r287802609 ## File path: flink-end-to-end-tests/flink-plugins-test/src/main/java/org/apache/flink/fs/dummy/DummyFileSystemFileStatus.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.dummy; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.Path; + +class DummyFileSystemFileStatus implements FileStatus { Review comment: Ok I get your point. In that case I would vote for one of the following: 1. `DummyFileSystem`, `DumyFileStatus`, `DummyDataInputStream` 2. `DummyFSFileSystem` `DumyFSFileStatus`, `DummyFSDataInputStream` (note `FS` which would also feet the code base better compared to `Fs` and note that all files should have the same prefix) 3. Rename it altogether, for example to `StaticContent` or `PredefinedContent` (`StaticContentFileSystem`, `StaticContentFileStatus`, `StaticContentDataInputStream`, ...) I think the 3rd would be the best, as it also describes in what way the file system is "dummy", not just the fact that it is "dummy". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287802261 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -376,39 +391,57 @@ public StateTtlConfig build() { private static final long serialVersionUID = 1373998465131443873L; } - final EnumMap strategies = new EnumMap<>(Strategies.class); + private final EnumMap strategies = new EnumMap<>(Strategies.class); - public void activate(Strategies strategy) { + private void activate(Strategies strategy) { activate(strategy, EMPTY_STRATEGY); } - public void activate(Strategies strategy, CleanupStrategy config) { + private void activate(Strategies strategy, CleanupStrategy config) { strategies.put(strategy, config); } public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); Review comment: true, I thought about another method, sorry for confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12611) Make time indicator nullable in blink
[ https://issues.apache.org/jira/browse/FLINK-12611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848956#comment-16848956 ] Timo Walther commented on FLINK-12611: -- Running aggregates like {{SELECT MAX(X)}} should not propagate time attributes. Currently, we strictly separate time-based operations and materializing operations. This is also discussed [here|https://github.com/ververica/sql-training/blob/master/slides/sql-training-02-querying-dynamic-tables.pdf] (slide 24 and 32). So the concept of time attributes and retract/upsert tables should be orthogonal. However, until the time attribute materialization happens, a time attribute should behave like a regular TIMESTAMP and thus can be nullable. > Make time indicator nullable in blink > - > > Key: FLINK-12611 > URL: https://issues.apache.org/jira/browse/FLINK-12611 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > SQL: select max(rowtime), count(a) from T > There will be a AssertionError: type mismatch: > aggCall type: > TIMESTAMP(3) NOT NULL > inferred type: > TIMESTAMP(3) > Agg type checking is done before TimeIndicator materializes. So there is a > exception. > And before introducing nullable of LogicalType, we should modify this to > avoid more potential TypeCheck problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha closed pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream
aljoscha closed pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/7470 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on issue #7470: [FLINK-11283] Accessing the key when processing connected keyed stream
aljoscha commented on issue #7470: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/7470#issuecomment-496221694 Thanks a lot for staying on this issue for long, @yanghua. I merged it finally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream
aljoscha commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/8538#issuecomment-496221456 Merged This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11283) Add keyed CoProcessFunction that allows accessing key
[ https://issues.apache.org/jira/browse/FLINK-11283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-11283. Resolution: Implemented Fix Version/s: 1.9.0 Implemented on master in 3a5bf89384ed07431d15285ef40e751daf9d0c83 > Add keyed CoProcessFunction that allows accessing key > - > > Key: FLINK-11283 > URL: https://issues.apache.org/jira/browse/FLINK-11283 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, we can access the key when using {{KeyedProcessFunction}} . > Simillar functionality would be very useful when processing connected keyed > stream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation
1u0 commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation URL: https://github.com/apache/flink/pull/8495#issuecomment-496221499 Timings of the tests, from the CI logs: ``` [PASS] 'Running Kerberized YARN on Docker test (default input)' passed after 8 minutes and 13 seconds! Test exited with exit code 0. [PASS] 'Running Kerberized YARN on Docker test (custom fs plugin)' passed after 4 minutes and 33 seconds! Test exited with exit code 0. [PASS] 'Wordcount end-to-end test' passed after 0 minutes and 12 seconds! Test exited with exit code 0. [PASS] 'Shaded Hadoop S3A end-to-end test' passed after 0 minutes and 1 seconds! Test exited with exit code 0. [PASS] 'Shaded Presto S3 end-to-end test' passed after 0 minutes and 0 seconds! Test exited with exit code 0. [PASS] 'Custom FS plugin end-to-end test' passed after 0 minutes and 13 seconds! Test exited with exit code 0. ``` So adding a modified Yarn kerberos test as addition, extends tests run by 4,5 minutes. He it's may look faster, because it reuses the docker build created by previous test run. Otherwise, I expect them to run about the same amount of time in equal conditions. CC @zentol. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11373) CliFrontend cuts off reason for error messages
[ https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11373: - Component/s: (was: Deployment / Scripts) Command Line Client > CliFrontend cuts off reason for error messages > -- > > Key: FLINK-11373 > URL: https://issues.apache.org/jira/browse/FLINK-11373 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Maximilian Michels >Assignee: leesf >Priority: Minor > Labels: pull-request-available, starter > Time Spent: 10m > Remaining Estimate: 0h > > The CliFrontend seems to only print the first message in the strace trace and > not any of its causes. > {noformat} > bin/flink run /non-existing/path > Could not build the program from JAR file. > Use the help option (-h or --help) to get help on the command. > {noformat} > Notice, the underlying cause of this message is FileNotFoundException. > Consider changing > a) the error message for this particular case > b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha closed pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream
aljoscha closed pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/8538 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287799750 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java ## @@ -18,48 +18,64 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.util.Preconditions; +import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor; +import javax.annotation.Nonnull; import java.io.Serializable; /** * Contains information where to find a partition. The partition is defined by the - * {@link IntermediateDataSetID} and the partition location is specified by - * {@link InputChannelDeploymentDescriptor}. + * {@link IntermediateDataSetID} and the partition is specified by + * {@link org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor}. */ public class PartitionInfo implements Serializable { private static final long serialVersionUID = 1724490660830968430L; + @Nonnull private final IntermediateDataSetID intermediateDataSetID; Review comment: At the moment, `IntermediateDataSetID` is kept once in `InputGateDeploymentDescriptor` for all channels/partitions as before. I suggest we keep it this way in this PR. Later we can consider one step further refactoring where `IntermediateDataSetID` is part of the full partition id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord
aljoscha commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord URL: https://github.com/apache/flink/pull/8535#issuecomment-496219121 Also, @tweise or @jgrier is this feature relevant for you? Maybe you also have some comments/input? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8510: [FLINK-12254][table] Update cast() and TypeLiteralExpression to new type system
asfgit closed pull request #8510: [FLINK-12254][table] Update cast() and TypeLiteralExpression to new type system URL: https://github.com/apache/flink/pull/8510 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API
zentol commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API URL: https://github.com/apache/flink/pull/8485#issuecomment-496213298 > And if we think there are no concerns for creating one metric group twice There are concerns; it prints a warning to the user as it is not how the API is supposed to be used. If any components need access to a shared group then this group should be created once and passed around as needed. Things were implemented this way on purpose to prevent components from interfering with each other by accident. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287788958 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Collections; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Result containing the tasks to restart upon a task failure. + * Also contains the reason if the failure is not recoverable(non-recoverable + * failure type or restarting suppressed by restart strategy). + */ +public class FailureHandlingResult { + + /** Task vertices to restart to recover from the failure. */ + private final Set verticesToRestart; + + /** Delay before the restarting can be conducted. */ + private final long restartDelayMS; + + /** Reason why the failure is not recoverable. */ + private final Throwable error; + + /** +* Creates a result of a set of tasks to restart to recover from the failure. +* +* @param verticesToRestart containing task vertices to restart to recover from the failure +* @param restartDelayMS indicate a delay before conducting the restart +*/ + private FailureHandlingResult(Set verticesToRestart, long restartDelayMS) { + checkState(restartDelayMS >= 0); + + this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart)); + this.restartDelayMS = restartDelayMS; + this.error = null; + } + + /** +* Creates a result that the failure is not recoverable and no restarting should be conducted. +* +* @param error reason why the failure is not recoverable +*/ + private FailureHandlingResult(Throwable error) { + this.verticesToRestart = null; + this.restartDelayMS = -1; + this.error = checkNotNull(error); + } + + /** +* Returns the tasks to restart. +* +* @return the tasks to restart +*/ + public Set getVerticesToRestart() { + if (canRestart()) { + return verticesToRestart; + } else { + throw new IllegalStateException("Cannot get vertices to restart when the restarting is suppressed."); + } + } + + /** +* Returns the delay before the restarting. +* +* @return the delay before the restarting +*/ + public long getRestartDelayMS() { + if (canRestart()) { + return restartDelayMS; + } else { + throw new IllegalStateException("Cannot get restart delay when the restarting is suppressed."); + } + } + + /** +* Returns whether the restarting can be conducted. +* +* @return whether the restarting can be conducted +*/ + public boolean canRestart() { + return error == null; + } + + /** +* Returns reason why the restarting cannot be conducted. +* +* @return reason why the restarting cannot be conducted +*/ + public Throwable getError() { + return error; Review comment: shouldn't this also throw an `IllegalStateException` if `error == null` like `getRestartDelayMS` and `getVerticesToRestart`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler
zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler URL: https://github.com/apache/flink/pull/8498#discussion_r287786958 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link FailureHandlingResult}. + */ +public class FailureHandlingResultTest extends TestLogger { + + /** +* Tests normal FailureHandlingResult. +*/ + @Test + public void testNormalFailureHandlingResult() throws Exception { + // create a normal FailureHandlingResult + Set tasks = new HashSet<>(); Review comment: You missed this one :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services