[GitHub] [flink] HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration

2019-05-27 Thread GitBox
HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config 
for yarn configuration
URL: https://github.com/apache/flink/pull/8303#issuecomment-496370553
 
 
   @rmetzger @tillrohrmann 
   I was blocked by setting up the secured MiniDFSCluster in integration tests, 
so took a little more time on the PR. Please review it at your most convenient 
time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side

2019-05-27 Thread GitBox
flinkbot commented on issue #8556: [FLINK-12171][Network] Do not limit the 
network buffer memory by heap size on the TM side
URL: https://github.com/apache/flink/pull/8556#issuecomment-496368873
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-05-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12171:
---
Labels: pull-request-available  (was: )

> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gaoyunhaii opened a new pull request #8556: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side

2019-05-27 Thread GitBox
gaoyunhaii opened a new pull request #8556: [FLINK-12171][Network] Do not limit 
the network buffer memory by heap size on the TM side
URL: https://github.com/apache/flink/pull/8556
 
 
   ## What is the purpose of the change
   
   This pull request fixes the bug that limits the network buffer size with the 
heap size on the TM side. In fact, network buffer occupies a part of direct 
memory and is independent with the heap.
   
   To fix this problem, The limitation on the TM side is removed. Although we 
may want to compare the network memory size with the total memory size on the 
TM side, currently we can only compute the total memory with heap + computed 
network memory and the computed total memory should be always larger than the 
computed network memory.
   
   To remove the limitation, the max allowed memory used to check the network 
memory size on TM side is changed to Long.MAX_VALUE. Another option is to move 
the checking to the caller function on the RM side. however, it is not easy to 
achieve since the checking relies on the configured values of MIN and MAX, and 
it is not accessible outside of the current function.
   
   ## Brief change log
 - *Change the maximum allow memory on TM side to Long.MAX_VALUE.*
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - *Manually verified the change by running a cluster with two task 
managers for both standalone and YARN mode, and test the configuration with 
heap = 3G/network = 2G and heap = 5G/network = 2G*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf commented on a change in pull request #8543: [FLINK-12101] Race condition when concurrently running uploaded jars via REST

2019-05-27 Thread GitBox
leesf commented on a change in pull request #8543: [FLINK-12101] Race condition 
when concurrently running uploaded jars via REST
URL: https://github.com/apache/flink/pull/8543#discussion_r287931889
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##
 @@ -1061,8 +1064,12 @@ private static String getDefaultName() {
 * @return The execution environment of the context in which the 
program is executed.
 */
public static ExecutionEnvironment getExecutionEnvironment() {
+
return contextEnvironmentFactory == null ?
-   createLocalEnvironment() : 
contextEnvironmentFactory.createExecutionEnvironment();
+   (contextEnvironmentFactoryThreadLocal.get() == 
null ?
 
 Review comment:
   Thanks for your review @klion26. Good catch and updated.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive

2019-05-27 Thread GitBox
liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic 
of releasing node from recursive to non-recursive
URL: https://github.com/apache/flink/pull/8511#issuecomment-496361993
 
 
   Hi @tillrohrmann , would you please take a look at this issue? I think it 
deserves to be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file

2019-05-27 Thread GitBox
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] 
Support catalogs in SQL-Client yaml config file
URL: https://github.com/apache/flink/pull/8541#discussion_r287926431
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogFactory.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Catalog;
+
+import java.util.Map;
+
+/**
+ * A factory to create configured catalog instances based on string-based 
properties. See
+ * also {@link TableFactory} for more information.
+ */
+@PublicEvolving
+public interface CatalogFactory extends TableFactory {
 
 Review comment:
   It's a little weird that CatalogFactory extends from TableFactory. Any 
thoughts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-05-27 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849304#comment-16849304
 ] 

Yun Gao commented on FLINK-12171:
-

After further analyze this problem, now I think we do not need to check the 
maximum allowed memory on TM side.

For RM side, we compute the network memory size from the total memory size, 
there may be cases that the configured MIN and MAX is too large that the 
resulted network memory is larger than the total memory size, we need to check 
against that.

However, on TM side, we do not know the total memory size, instead we only know 
the heap size. We can only deduce the total memory size by heap size + computed 
network memory, which is always larger than the computed network memory. 

Therefore, unless we ensure the total memory size is available on the TM side 
and we also compute the network memory size from the total memory size on TM 
side, we can not check the network memory size.

According to the above analysis, I think we can first remove the comparison of 
the network memory size and heap memory size directly. This comparison is not 
right since the network memory is not part of the heap memory, and it may raise 
error when the configuration is in fact reasonable. 

 

 

> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables

2019-05-27 Thread GitBox
lirui-apache commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r287919214
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.type.DecimalType;
+
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+
+/**
+ * Class used to serialize to and from raw hdfs file type.
+ * Highly inspired by HCatRecordSerDe (almost copied from this class)in 
hive-catalog-core.
+ */
+public class HiveRecordSerDe {
+
+   /**
+* Return underlying Java Object from an object-representation
+* that is readable by a provided ObjectInspector.
+*/
+   public static Object serializeField(Object field, ObjectInspector 
fieldObjectInspector)
 
 Review comment:
   I think some methods need better naming. Usually *serialize* is to convert 
an object to a byte[]. It seems what the method does here is to convert a hive 
object to flink object?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file

2019-05-27 Thread GitBox
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] 
Support catalogs in SQL-Client yaml config file
URL: https://github.com/apache/flink/pull/8541#discussion_r287918350
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 ##
 @@ -70,4 +79,24 @@ public void testMerging() throws Exception {
assertTrue(merged.getExecution().isStreamingExecution());
assertEquals(16, merged.getExecution().getMaxParallelism());
}
+
+   @Test
+   public void testDuplicateCatalog() {
+   exception.expect(SqlClientException.class);
+   exception.expectMessage("Cannot create catalog 'catalog2' 
because a catalog with this name is already registered.");
+   Environment env = new Environment();
+   env.setCatalogs(Arrays.asList(
+   getCatalog("catalog1", "test"),
+   getCatalog("catalog2", "test"),
+   getCatalog("catalog2", "test")));
+   }
+
+   private static Map getCatalog(String name, String type) 
{
 
 Review comment:
   rename to createCatalog or instantiateCatalog?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode

2019-05-27 Thread GitBox
godfreyhe commented on a change in pull request #8520: [FLINK-12600] 
[table-planner-blink] Introduce planner rules to do deterministic rewriting on 
RelNode
URL: https://github.com/apache/flink/pull/8520#discussion_r287917681
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operandJ}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
+import org.apache.calcite.rex.{RexShuttle, _}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeFamily
+import org.apache.calcite.sql.fun.SqlCountAggFunction
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that rewrites filter condition like:
+  * `(select count(*) from T) > 0` to `exists(select * from T)`,
 
 Review comment:
   The estimation for SEMI/ANTI join is very inaccurate,so we intend to do 
deterministic rewriting on SEMI/ANTI join. And we can put this rule to CBO 
after we improve estimation of SEMI/ANTI join.
   
   yes, we can do similarly rewriting for `exists(select * from T limit 1)` 
later.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode

2019-05-27 Thread GitBox
godfreyhe commented on a change in pull request #8520: [FLINK-12600] 
[table-planner-blink] Introduce planner rules to do deterministic rewriting on 
RelNode
URL: https://github.com/apache/flink/pull/8520#discussion_r287917681
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operandJ}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
+import org.apache.calcite.rex.{RexShuttle, _}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeFamily
+import org.apache.calcite.sql.fun.SqlCountAggFunction
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that rewrites filter condition like:
+  * `(select count(*) from T) > 0` to `exists(select * from T)`,
 
 Review comment:
   The estimation for SEMI/ANTI join is very inaccurate,so we intend to do 
deterministic rewriting on SEMI/ANTI join. And we can put this rule to CBO 
after we improve estimation of SEMI/ANTI join.
   
   yes, we can do similarly rewriting for `exists(select * from T limit 1)`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-27 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287917746
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
+   .withDescription("The socket timeout in milliseconds 
for the blob client.");
+
+   /**
+* The connection timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption CONNECT_TIMEOUT =
+   key("blob.client.connect.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   Please look at my above comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on issue #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-27 Thread GitBox
sunhaibotb commented on issue #8484: [FLINK-12547] Add connection and socket 
timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#issuecomment-496350510
 
 
   Thanks for the reviews @tillrohrmann .  I have update the code, please 
review it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-27 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287915194
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose 
default value is also `120s`. Let we keep connection/socket timeouts consistent 
with it? @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-27 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287915194
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose 
default value is set to `120s`. Let we keep connection/socket timeouts 
consistent with it? @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12638) Expose max parallelism via the REST API and via the Web UI

2019-05-27 Thread Sean Bollin (JIRA)
Sean Bollin created FLINK-12638:
---

 Summary: Expose max parallelism via the REST API and via the Web UI
 Key: FLINK-12638
 URL: https://issues.apache.org/jira/browse/FLINK-12638
 Project: Flink
  Issue Type: Improvement
Reporter: Sean Bollin


Currently there is no way to view what max parallelism is set to. Let's add max 
parallelism to the REST API and the Web UI so you can see what it is set to.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-27 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287915194
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   Check the option `taskmanager.network.netty.client.connectTimeoutSec`, whose 
default value is set to `120s`, and let we keep connection/socket timeouts 
consistent with it.  What do you think? @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8520: [FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode

2019-05-27 Thread GitBox
godfreyhe commented on a change in pull request #8520: [FLINK-12600] 
[table-planner-blink] Introduce planner rules to do deterministic rewriting on 
RelNode
URL: https://github.com/apache/flink/pull/8520#discussion_r287914800
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType, Values}
+
+object FlinkPruneEmptyRules {
 
 Review comment:
   `PruneEmptyRules` in Calcite contains more than one rules, I intend to keep 
calcite style for this rule.  Maybe later, we need to copy other rules from 
`PruneEmptyRules` to this file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate

2019-05-27 Thread GitBox
godfreyhe commented on a change in pull request #8527: [FLINK-12610] 
[table-planner-blink] Introduce planner rules about aggregate
URL: https://github.com/apache/flink/pull/8527#discussion_r287913290
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
 ##
 @@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical;
+
+import org.apache.flink.table.plan.util.AggregateUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlSplittableAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.Mappings;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/**
+ * This rule is copied from Calcite's {@link 
org.apache.calcite.rel.rules.AggregateJoinTransposeRule}.
+ * Modification:
+ * - Do not match TemporalTableScan since it means that it is a dimension 
table scan currently.
 
 Review comment:
   this rule is already in CBO, and CBO does not handle the unspported case 
(lookup table source doesn't support aggregate).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate

2019-05-27 Thread GitBox
godfreyhe commented on a change in pull request #8527: [FLINK-12610] 
[table-planner-blink] Introduce planner rules about aggregate
URL: https://github.com/apache/flink/pull/8527#discussion_r287912472
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/AggregateCalcMergeRule.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate}
+ * on top of a {@link org.apache.calcite.rel.core.Calc} and if possible
+ * aggregate through the calc or removes the calc.
+ *
+ * This is only possible when no condition in calc and the grouping 
expressions and arguments to
 
 Review comment:
   `AggregateProjectMergeRule` already exists and is in our rule set. The 
original intention of introducing this rule is to solve the hack in 
`RelDecorrelator` for tpch query20, however current cost model can not find 
best plan, so this rule has not been add to our rule set.  we can move this 
rule from this pr, and introduce it when needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file

2019-05-27 Thread GitBox
xuefuz commented on a change in pull request #8541: [FLINK-9172][sql-client] 
Support catalogs in SQL-Client yaml config file
URL: https://github.com/apache/flink/pull/8541#discussion_r287912450
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -227,6 +241,12 @@ private static ClusterSpecification 
createClusterSpecification(CustomCommandLine
}
}
 
+   private Catalog createCatalog(String name, Map 
catalogProperties, ClassLoader classLoader) {
+   final CatalogFactory factory =
+   TableFactoryService.find(CatalogFactory.class, 
catalogProperties, classLoader);
 
 Review comment:
   Shall we rename the factory service, coz we are here creating catalogs 
rather than tables.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces

2019-05-27 Thread GitBox
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] 
Introduce Input and NetworkInput interfaces
URL: https://github.com/apache/flink/pull/8476#discussion_r287910131
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java
 ##
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
+import java.io.Closeable;
+
+/**
+ * Basic interface for inputs of stream operators.
+ */
+@Internal
+public interface Input extends AsyncDataInput, Closeable {
 
 Review comment:
   > @sunhaibotb, are you fine with renaming those renames? (and fyi, since 
renaming will affect a wip `StreamSelectableTwoInputStreamProcessor` PR.
   
   It's fine to me. @pnowojski @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287909365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Result containing the tasks to restart upon a task failure.
+ * Also contains the reason if the failure is not recoverable(non-recoverable
+ * failure type or restarting suppressed by restart strategy).
+ */
+public class FailureHandlingResult {
+
+   /** Task vertices to restart to recover from the failure. */
+   private final Set verticesToRestart;
+
+   /** Delay before the restarting can be conducted. */
+   private final long restartDelayMS;
+
+   /** Reason why the failure is not recoverable. */
+   private final Throwable error;
+
+   /**
+* Creates a result of a set of tasks to restart to recover from the 
failure.
+*
+* @param verticesToRestart containing task vertices to restart to 
recover from the failure
+* @param restartDelayMS indicate a delay before conducting the restart
+*/
+   private FailureHandlingResult(Set verticesToRestart, 
long restartDelayMS) {
+   checkState(restartDelayMS >= 0);
+
+   this.verticesToRestart = 
Collections.unmodifiableSet(checkNotNull(verticesToRestart));
+   this.restartDelayMS = restartDelayMS;
+   this.error = null;
+   }
+
+   /**
+* Creates a result that the failure is not recoverable and no 
restarting should be conducted.
+*
+* @param error reason why the failure is not recoverable
+*/
+   private FailureHandlingResult(Throwable error) {
+   this.verticesToRestart = null;
+   this.restartDelayMS = -1;
+   this.error = checkNotNull(error);
+   }
+
+   /**
+* Returns the tasks to restart.
+*
+* @return the tasks to restart
+*/
+   public Set getVerticesToRestart() {
+   if (canRestart()) {
+   return verticesToRestart;
+   } else {
+   throw new IllegalStateException("Cannot get vertices to 
restart when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns the delay before the restarting.
+*
+* @return the delay before the restarting
+*/
+   public long getRestartDelayMS() {
+   if (canRestart()) {
+   return restartDelayMS;
+   } else {
+   throw new IllegalStateException("Cannot get restart 
delay when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns whether the restarting can be conducted.
+*
+* @return whether the restarting can be conducted
+*/
+   public boolean canRestart() {
+   return error == null;
+   }
+
+   /**
+* Returns reason why the restarting cannot be conducted.
+*
+* @return reason why the restarting cannot be conducted
+*/
+   public Throwable getError() {
+   return error;
 
 Review comment:
   You are right. I will keep them aligned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8553: [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog

2019-05-27 Thread GitBox
xuefuz commented on issue #8553: [FLINK-12418][hive] Add input/output format 
and SerDeLib information when creating Hive table in HiveCatalog
URL: https://github.com/apache/flink/pull/8553#issuecomment-496340490
 
 
   LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord

2019-05-27 Thread GitBox
tweise commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema 
that uses ProducerRecord
URL: https://github.com/apache/flink/pull/8535#issuecomment-496340207
 
 
   @aljoscha glad to see this change, we may be interested to use the headers 
with 0.11
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287909399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link FailureHandlingResult}.
+ */
+public class FailureHandlingResultTest extends TestLogger {
+
+   /**
+* Tests normal FailureHandlingResult.
+*/
+   @Test
+   public void testNormalFailureHandlingResult() throws Exception {
+   // create a normal FailureHandlingResult
+   Set tasks = new HashSet<>();
 
 Review comment:
   My fault.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287909365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Result containing the tasks to restart upon a task failure.
+ * Also contains the reason if the failure is not recoverable(non-recoverable
+ * failure type or restarting suppressed by restart strategy).
+ */
+public class FailureHandlingResult {
+
+   /** Task vertices to restart to recover from the failure. */
+   private final Set verticesToRestart;
+
+   /** Delay before the restarting can be conducted. */
+   private final long restartDelayMS;
+
+   /** Reason why the failure is not recoverable. */
+   private final Throwable error;
+
+   /**
+* Creates a result of a set of tasks to restart to recover from the 
failure.
+*
+* @param verticesToRestart containing task vertices to restart to 
recover from the failure
+* @param restartDelayMS indicate a delay before conducting the restart
+*/
+   private FailureHandlingResult(Set verticesToRestart, 
long restartDelayMS) {
+   checkState(restartDelayMS >= 0);
+
+   this.verticesToRestart = 
Collections.unmodifiableSet(checkNotNull(verticesToRestart));
+   this.restartDelayMS = restartDelayMS;
+   this.error = null;
+   }
+
+   /**
+* Creates a result that the failure is not recoverable and no 
restarting should be conducted.
+*
+* @param error reason why the failure is not recoverable
+*/
+   private FailureHandlingResult(Throwable error) {
+   this.verticesToRestart = null;
+   this.restartDelayMS = -1;
+   this.error = checkNotNull(error);
+   }
+
+   /**
+* Returns the tasks to restart.
+*
+* @return the tasks to restart
+*/
+   public Set getVerticesToRestart() {
+   if (canRestart()) {
+   return verticesToRestart;
+   } else {
+   throw new IllegalStateException("Cannot get vertices to 
restart when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns the delay before the restarting.
+*
+* @return the delay before the restarting
+*/
+   public long getRestartDelayMS() {
+   if (canRestart()) {
+   return restartDelayMS;
+   } else {
+   throw new IllegalStateException("Cannot get restart 
delay when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns whether the restarting can be conducted.
+*
+* @return whether the restarting can be conducted
+*/
+   public boolean canRestart() {
+   return error == null;
+   }
+
+   /**
+* Returns reason why the restarting cannot be conducted.
+*
+* @return reason why the restarting cannot be conducted
+*/
+   public Throwable getError() {
+   return error;
 
 Review comment:
   You are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zhuzhurk commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287908868
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This handler deals with task failures to return a {@link 
FailureHandlingResult} which contains tasks
+ * to restart to recover from failures.
+ */
+public class ExecutionFailureHandler {
+
+   /** Strategy to judge which tasks should be restarted. */
+   private final FailoverStrategy failoverStrategy;
+
+   /** Strategy to judge whether and when a restarting should be done. */
+   private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+   /**
+* Creates the handler to deal with task failures.
+*
+* @param failoverStrategy helps to decide tasks to restart on task 
failures
+* @param restartBackoffTimeStrategy helps to decide whether to restart 
failed tasks and the restarting delay
+*/
+   public ExecutionFailureHandler(
+   FailoverStrategy failoverStrategy,
+   RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+
+   this.failoverStrategy = checkNotNull(failoverStrategy);
+   this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+   }
+
+   /**
+* Return result of failure handling. Can be a set of task vertices to 
restart
+* and a delay of the restarting. Or that the failure is not 
recoverable and the reason for it.
+*
+* @param failedTask is the ID of the failed task vertex
+* @param cause of the task failure
+* @return result of the failure handling
+*/
+   public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID 
failedTask, Throwable cause) {
+   if (ThrowableClassifier.getThrowableType(cause) == 
ThrowableType.NonRecoverableError) {
 
 Review comment:
   Yes the design doc proposed to check the error type in the restart strategy. 
But that requires every strategy implementation to do the same check.
   
   Therefore I think it's better we let the restart strategy do it's dedicated 
check only. 
   
   Besides, I'm also thinking that whether we can have a composed restart 
strategy, which can do multiple checks for failing max limit, failing rate, etc.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-05-27 Thread GitBox
Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce 
BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#issuecomment-496338752
 
 
   @GJL  please have a look at this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12620) Deadlock in task deserialization

2019-05-27 Thread Mike Kaplinskiy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849259#comment-16849259
 ] 

Mike Kaplinskiy edited comment on FLINK-12620 at 5/28/19 12:38 AM:
---

Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] 

Somewhere between those 2 threads is a class initialization deadlock. My hacky 
fix that I tried locally looks like this:
{code}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 644289133b..dc722c1db4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -492,7 +492,7 @@ public final class InstantiationUtil {
}
}

-   public static  T readObjectFromConfig(Configuration config, String 
key, ClassLoader cl) throws IOException, ClassNotFoundException {
+   public static synchronized  T readObjectFromConfig(Configuration 
config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
byte[] bytes = config.getBytes(key, null);
if (bytes == null) {
return null;
{code}

That said, I'm not sure that it's the proper fix.


was (Author: mikekap):
Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] 

Somewhere between those 3 threads is a class initialization deadlock. My hacky 
fix that I tried locally looks like this:
{code}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 644289133b..dc722c1db4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -492,7 +492,7 @@ public final class InstantiationUtil {
}
}

-   public static  T readObjectFromConfig(Configuration config, String 
key, ClassLoader cl) throws IOException, ClassNotFoundException {
+   public static synchronized  T readObjectFromConfig(Configuration 
config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
byte[] bytes = config.getBytes(key, null);
if (bytes == null) {
return null;
{code}

That said, I'm not sure that it's the proper fix.

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12620) Deadlock in task deserialization

2019-05-27 Thread Mike Kaplinskiy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849259#comment-16849259
 ] 

Mike Kaplinskiy commented on FLINK-12620:
-

Sure, here's an example deadlock that I see, attached. [^jstack_snippet.txt] 

Somewhere between those 3 threads is a class initialization deadlock. My hacky 
fix that I tried locally looks like this:
{code}
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 644289133b..dc722c1db4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -492,7 +492,7 @@ public final class InstantiationUtil {
}
}

-   public static  T readObjectFromConfig(Configuration config, String 
key, ClassLoader cl) throws IOException, ClassNotFoundException {
+   public static synchronized  T readObjectFromConfig(Configuration 
config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
byte[] bytes = config.getBytes(key, null);
if (bytes == null) {
return null;
{code}

That said, I'm not sure that it's the proper fix.

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12620) Deadlock in task deserialization

2019-05-27 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12620:

Attachment: jstack_snippet.txt

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287807783
 
 

 ##
 File path: docs/dev/stream/state/state.md
 ##
 @@ -483,11 +513,12 @@ val ttlConfig = StateTtlConfig
 
 
 RocksDB compaction filter will query current timestamp, used to check 
expiration, from Flink every time 
-after processing certain number of state entries. This number is 1000 by 
default. 
+after processing certain number of state entries.
 You can optionally change it and pass a custom value to 
 `StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries)` method. 
 Updating the timestamp more often can improve cleanup speed 
 but it decreases compaction performance because it uses JNI call from native 
code.
+If you enable the default background cleanup then this strategy will be 
activated for RocksDB backend with 1000 number of state entries every time 
after processing.
 
 Review comment:
   `this strategy will be activated for RocksDB backend and the current 
timestamp will be queried each time 1000 entries have been processed.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287804877
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -376,39 +398,42 @@ public StateTtlConfig build() {
private static final long serialVersionUID = 
1373998465131443873L;
}
 
-   final EnumMap strategies = new 
EnumMap<>(Strategies.class);
-
-   public void activate(Strategies strategy) {
-   activate(strategy, EMPTY_STRATEGY);
-   }
-
-   public void activate(Strategies strategy, CleanupStrategy 
config) {
-   strategies.put(strategy, config);
+   public CleanupStrategies(EnumMap 
strategies, boolean isCleanupInBackground) {
 
 Review comment:
   can be private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287878198
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -322,11 +329,22 @@ public Builder cleanupInRocksdbCompactFilter() {
 */
@Nonnull
public Builder cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries) {
-   
cleanupStrategies.activate(CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
+   
strategies.put(CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
new 
RocksdbCompactFilterCleanupStrategy(queryTimeAfterNumEntries));
return this;
}
 
+   /**
+* Enable cleanup of expired state in background.
+*
+* Depending on actually used backend, the corresponding 
cleanup will kick in if supported.
+*/
+   @Nonnull
+   public Builder cleanupInBackground() {
 
 Review comment:
   I suggest we add a simple unit test for this method where `StateTtlConfig` 
is built like in the doc example with `cleanupInBackground` and test checks 
that `StateTtlConfig.getCleanupStrategies().isCleanupInBackground()` is `true`, 
`getIncrementalCleanupStrategy()` is not null, 
`getIncrementalCleanupStrategy()getCleanupSize()` is 5 etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287809610
 
 

 ##
 File path: docs/dev/stream/state/state.md
 ##
 @@ -483,11 +513,12 @@ val ttlConfig = StateTtlConfig
 
 
 RocksDB compaction filter will query current timestamp, used to check 
expiration, from Flink every time 
-after processing certain number of state entries. This number is 1000 by 
default. 
+after processing certain number of state entries.
 You can optionally change it and pass a custom value to 
 
 Review comment:
   let's remove `optionally` and put `1000` into 
`cleanupInRocksdbCompactFilter(1000)` in java and scala examples as we are 
deprecating the parameterless version of `cleanupInRocksdbCompactFilter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API

2019-05-27 Thread GitBox
azagrebin commented on issue #8485: [FLINK-12555] Introduce an encapsulated 
metric group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#issuecomment-496287266
 
 
   True, thanks for noticing it @zhijiangW, it was not an intent. I will change 
it to create group only once. It can be called e.g. 
`NetworkInput`/`NetworkOutput`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10121) Introduce methods to remove registered operator states

2019-05-27 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849087#comment-16849087
 ] 

aitozi commented on FLINK-10121:


Hi,[~srichter] Is this still ongoing work ? I think it's a valuable feature, If 
it has not been started, I want to look into this. 

> Introduce methods to remove registered operator states
> --
>
> Key: FLINK-10121
> URL: https://issues.apache.org/jira/browse/FLINK-10121
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> User can register new operator states but never remove a registered state. 
> This is particularly problematic with expensive states or states that we 
> register only to provide backwards compatibility. We can also consider the 
> same for keyed state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10098) Programmatically select timer storage backend

2019-05-27 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849082#comment-16849082
 ] 

aitozi commented on FLINK-10098:


I will work on this issue to solve the problem

> Programmatically select timer storage backend
> -
>
> Key: FLINK-10098
> URL: https://issues.apache.org/jira/browse/FLINK-10098
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / State Backends
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Elias Levy
>Assignee: aitozi
>Priority: Major
>
> FLINK-9486 introduced timer storage on the RocksDB storage backend.  Right 
> now it is only possible to configure RocksDB as the storage for timers by 
> settings the {{state.backend.rocksdb.timer-service.factory}} value in the 
> configuration file for Flink.
> As the state storage backend can be programmatically selected by by jobs via  
> {{env.setStateBackend(...)}}, the timer backend should also be configurable 
> programmatically.
> Different jobs should be able to store their timers in different storage 
> backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10098) Programmatically select timer storage backend

2019-05-27 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi reassigned FLINK-10098:
--

Assignee: aitozi

> Programmatically select timer storage backend
> -
>
> Key: FLINK-10098
> URL: https://issues.apache.org/jira/browse/FLINK-10098
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / State Backends
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Elias Levy
>Assignee: aitozi
>Priority: Major
>
> FLINK-9486 introduced timer storage on the RocksDB storage backend.  Right 
> now it is only possible to configure RocksDB as the storage for timers by 
> settings the {{state.backend.rocksdb.timer-service.factory}} value in the 
> configuration file for Flink.
> As the state storage backend can be programmatically selected by by jobs via  
> {{env.setStateBackend(...)}}, the timer backend should also be configurable 
> programmatically.
> Different jobs should be able to store their timers in different storage 
> backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken edited comment on FLINK-12302 at 5/27/19 4:53 PM:
-

[~gjy], from another side, we can analysis this issue only from the code logic.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
 

*Longrunning Applications's FinalStatus*

*!image-2019-05-28-00-46-49-740.png!*

 

 


was (Author: lamber-ken):
[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
 

*Longrunning Applications's FinalStatus*

*!image-2019-05-28-00-46-49-740.png!*

 

 

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix 

[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849080#comment-16849080
 ] 

lamber-ken commented on FLINK-12302:


So, when the unexcepted scene happens, it displays wrong +finalStatus+, because 
it's not running. The FinalStatus should be FAILED not UNDEFINED.

!image-2019-05-28-00-50-13-500.png!

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, image-2019-05-28-00-46-49-740.png, 
> image-2019-05-28-00-50-13-500.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: image-2019-05-28-00-50-13-500.png

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, image-2019-05-28-00-46-49-740.png, 
> image-2019-05-28-00-50-13-500.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken edited comment on FLINK-12302 at 5/27/19 4:47 PM:
-

[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
 

*Longrunning Applications's FinalStatus*

*!image-2019-05-28-00-46-49-740.png!*

 

 


was (Author: lamber-ken):
[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> 

[jira] [Assigned] (FLINK-11634) Translate "State Backends" page into Chinese

2019-05-27 Thread jeremy huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeremy huang reassigned FLINK-11634:


Assignee: jeremy huang

> Translate "State Backends" page into Chinese
> 
>
> Key: FLINK-11634
> URL: https://issues.apache.org/jira/browse/FLINK-11634
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: jeremy huang
>Priority: Major
>
> doc locates in flink/docs/dev/stream/state/state_backens.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken edited comment on FLINK-12302 at 5/27/19 4:43 PM:
-

[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  


was (Author: lamber-ken):
[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*

 
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
**

 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> 

[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken edited comment on FLINK-12302 at 5/27/19 4:42 PM:
-

[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state, then the 
+UNKNOWN+ state will transfer to +{{UNDEFINED}}+ by 
+YarnResourceManager#getYarnStatus.+

 

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

 

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*YarnResourceManager#getYarnStatus*

 
{code:java}
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
   if (status == null) {
  return FinalApplicationStatus.UNDEFINED;
   }
   else {
  switch (status) {
 case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
 case FAILED:
return FinalApplicationStatus.FAILED;
 case CANCELED:
return FinalApplicationStatus.KILLED;
 default:
return FinalApplicationStatus.UNDEFINED;
  }
   }
}
{code}
**

 

*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 


was (Author: lamber-ken):
[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state.

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken edited comment on FLINK-12302 at 5/27/19 4:34 PM:
-

[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state.

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*Hadoop Application Status* 
[FinalApplicationStatus|https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32]
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 


was (Author: lamber-ken):
[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state.

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*Hadoop Application Status 
https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32*
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849071#comment-16849071
 ] 

lamber-ken commented on FLINK-12302:


[~gjy], from another side, we can analysis this issue only from the code.

When some scene happends and the call the +MiniDispatcher#jobNotFinished+ 
method, it means the flink job terminate unexpectedly, so it will notify the RM 
to kill the yarn application with +ApplicationStatus.UNKNOWN+ state.

But, in hadoop system, the +{{UNDEFINED}}+ means the application has not yet 
finished.

*MiniDispatcher#jobNotFinished*
{code:java}
@Override
protected void jobNotFinished(JobID jobId) {
   super.jobNotFinished(jobId);

   // shut down since we have done our job
   jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
{code}
*Hadoop Application Status 
https://github.com/apache/hadoop-common/blob/42a61a4fbc88303913c4681f0d40ffcc737e70b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.java#L32*
{code:java}
/**
 * Enumeration of various final states of an Application.
 */
@Public
@Stable
public enum FinalApplicationStatus {

 /** Undefined state when either the application has not yet finished */
  UNDEFINED,

  /** Application which finished successfully. */
  SUCCEEDED,

  /** Application which failed. */
  FAILED,

  /** Application which was terminated by a user or admin. */
  KILLED
}
{code}
  

 

 

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-05-27 Thread GitBox
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-496260983
 
 
   Have added the three metric for the credit based mode, please take a look 
when you are free @zhijiangW @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
flinkbot edited a comment on issue #8517: [FLINK-10921] [kinesis] Shard 
watermark synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#issuecomment-495066880
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @jgrier
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @jgrier
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @jgrier
   * ✅ 5. Overall code [quality] is good.
   - Approved by @jgrier
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jgrier commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
jgrier commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#issuecomment-496260766
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] 
Shard watermark synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#discussion_r287840233
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param 
+ */
+public abstract class RecordEmitter implements 
Runnable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RecordEmitter.class);
+
+   /**
+* The default capacity of a single queue.
+*
+* Larger queue size can lead to higher throughput, but also to
+* very high heap space consumption, depending on the size of elements.
+*
+* Note that this is difficult to tune, because it does not take 
into account
+* the size of individual objects.
+*/
+   public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+   private final int queueCapacity;
+   private final ConcurrentHashMap> queues = 
new ConcurrentHashMap<>();
+   private final ConcurrentHashMap, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+   private final PriorityQueue> heads = new 
PriorityQueue<>(this::compareHeadElement);
+   private volatile boolean running = true;
+   private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+   private long maxLookaheadMillis = 60 * 1000; // one minute
+   private long idleSleepMillis = 100;
+   private final Object condition = new Object();
+
+   public RecordEmitter(int queueCapacity) {
+   this.queueCapacity = queueCapacity;
+   }
+
+   private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+   return Long.compare(left.headTimestamp, right.headTimestamp);
+   }
+
+   /**
+* Accepts records from readers.
+*
+* @param 
+*/
+   public interface RecordQueue {
+   void put(T record) throws InterruptedException;
+
+   int getQueueId();
+
+   int getSize();
+
+   T peek();
+   }
+
+   private class AsyncRecordQueue implements RecordQueue {
+   private final ArrayBlockingQueue queue;
+   private final int queueId;
+   long headTimestamp;
+
+   private AsyncRecordQueue(int queueId) {
+   super();
+   this.queue = new ArrayBlockingQueue<>(queueCapacity);
+   this.queueId = queueId;
+   this.headTimestamp = Long.MAX_VALUE;
+   }
+
+   public void put(T record) throws InterruptedException {
+   queue.put(record);
+   // TODO: not pretty having this here
+   synchronized (condition) {
+   condition.notify();
+   }
+   }
+
+   public int getQueueId() {
+   return queueId;
+   }
+
+   public int getSize() {
+   return queue.size();
+   }
+
+   public T peek() {
+   return queue.peek();
+   }
+
+   }
+
+   /**
+* The queue for the given producer (i.e. Kinesis shard consumer 
thread).
+*
+ 

[GitHub] [flink] jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-27 Thread GitBox
jgrier commented on a change in pull request #8517: [FLINK-10921] [kinesis] 
Shard watermark synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#discussion_r287840412
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param 
+ */
+public abstract class RecordEmitter implements 
Runnable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RecordEmitter.class);
+
+   /**
+* The default capacity of a single queue.
+*
+* Larger queue size can lead to higher throughput, but also to
+* very high heap space consumption, depending on the size of elements.
+*
+* Note that this is difficult to tune, because it does not take 
into account
+* the size of individual objects.
+*/
+   public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+   private final int queueCapacity;
+   private final ConcurrentHashMap> queues = 
new ConcurrentHashMap<>();
+   private final ConcurrentHashMap, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+   private final PriorityQueue> heads = new 
PriorityQueue<>(this::compareHeadElement);
+   private volatile boolean running = true;
+   private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+   private long maxLookaheadMillis = 60 * 1000; // one minute
+   private long idleSleepMillis = 100;
+   private final Object condition = new Object();
+
+   public RecordEmitter(int queueCapacity) {
+   this.queueCapacity = queueCapacity;
+   }
+
+   private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+   return Long.compare(left.headTimestamp, right.headTimestamp);
+   }
+
+   /**
+* Accepts records from readers.
+*
+* @param 
+*/
+   public interface RecordQueue {
+   void put(T record) throws InterruptedException;
+
+   int getQueueId();
+
+   int getSize();
+
+   T peek();
+   }
+
+   private class AsyncRecordQueue implements RecordQueue {
+   private final ArrayBlockingQueue queue;
+   private final int queueId;
+   long headTimestamp;
+
+   private AsyncRecordQueue(int queueId) {
+   super();
+   this.queue = new ArrayBlockingQueue<>(queueCapacity);
+   this.queueId = queueId;
+   this.headTimestamp = Long.MAX_VALUE;
+   }
+
+   public void put(T record) throws InterruptedException {
+   queue.put(record);
+   // TODO: not pretty having this here
+   synchronized (condition) {
+   condition.notify();
+   }
+   }
+
+   public int getQueueId() {
+   return queueId;
+   }
+
+   public int getSize() {
+   return queue.size();
+   }
+
+   public T peek() {
+   return queue.peek();
+   }
+
+   }
+
+   /**
+* The queue for the given producer (i.e. Kinesis shard consumer 
thread).
+*
+ 

[jira] [Updated] (FLINK-12637) Add floatingBufferUsage and exclusiveBufferUsage for credit based mode

2019-05-27 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-12637:
---
Description: Described 
[here|https://github.com/apache/flink/pull/8455#issuecomment-496077999]  (was: 
Described 
[here](https://github.com/apache/flink/pull/8455#issuecomment-496077999))

> Add floatingBufferUsage and exclusiveBufferUsage for credit based mode
> --
>
> Key: FLINK-12637
> URL: https://issues.apache.org/jira/browse/FLINK-12637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Network
>Affects Versions: 1.9.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Minor
>
> Described 
> [here|https://github.com/apache/flink/pull/8455#issuecomment-496077999]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12637) Add floatingBufferUsage and exclusiveBufferUsage for credit based mode

2019-05-27 Thread aitozi (JIRA)
aitozi created FLINK-12637:
--

 Summary: Add floatingBufferUsage and exclusiveBufferUsage for 
credit based mode
 Key: FLINK-12637
 URL: https://issues.apache.org/jira/browse/FLINK-12637
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics, Runtime / Network
Affects Versions: 1.9.0
Reporter: aitozi
Assignee: aitozi


Described 
[here](https://github.com/apache/flink/pull/8455#issuecomment-496077999)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Comment: was deleted

(was: [~gjy], hi, can you show me your +flink-conf.yaml+ file? thanks)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849038#comment-16849038
 ] 

lamber-ken commented on FLINK-12302:


[~gjy], here my env files.

1,test jars --> [^test.jar]

2,flink-1.8.0

3,flink-conf.yaml --> [^flink-conf.yaml]

4,the first jobmanager.log --> [^jobmanager-1.log]

5,the second jobmanager.log --> [^jobmanager-2.log]

you must wait the job reach the max attemp times, then you can kill the am. 
from the second jobmanager.log, we will see
{code:java}
Job 0cac7407733eb34396cd5e919631d4ff was not finished by JobManager. {code}

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: jobmanager-1.log

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: jobmanager-2.log

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-1.log, 
> jobmanager-2.log, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: flink-conf.yaml

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: (was: jobmanager-1.log)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: test.jar

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: (was: flink-conf.yaml)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: (was: jobmanager-2.log)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: (was: test.jar)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, 
> screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, 
> spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: test.jar
jobmanager-2.log
jobmanager-1.log
flink-conf.yaml

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, 
> screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, 
> spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: (was: test.jar)

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, flink-conf.yaml, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, jobmanager-2.log, 
> screenshot-1.png, screenshot-2.png, spslave4.bigdata.ly_23951, 
> spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-12302:
---
Attachment: test.jar

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271, 
> test.jar
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849010#comment-16849010
 ] 

lamber-ken commented on FLINK-12302:


[~gjy], hi, can you show me your +flink-conf.yaml+ file? thanks

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API

2019-05-27 Thread GitBox
zhijiangW commented on issue #8485: [FLINK-12555] Introduce an encapsulated 
metric group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#issuecomment-496231763
 
 
   Thanks for the replies @zentol .
   
   But in this PR `parentGroup.addGroup("Network")` is called twice in 
`NetworkEnvironment#createResultPartitionWriters/InputGates`. So we should 
change to create it still in task class and then pass it into 
`createResultPartitionWriters/InputGates` separately?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8555: [FLINK-12254][table-common] More preparation for using the new type system

2019-05-27 Thread GitBox
flinkbot commented on issue #8555: [FLINK-12254][table-common] More preparation 
for using the new type system
URL: https://github.com/apache/flink/pull/8555#issuecomment-496231212
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr opened a new pull request #8555: [FLINK-12254][table-common] More preparation for using the new type system

2019-05-27 Thread GitBox
twalthr opened a new pull request #8555: [FLINK-12254][table-common] More 
preparation for using the new type system
URL: https://github.com/apache/flink/pull/8555
 
 
   ## What is the purpose of the change
   
   This PR contains another set of utility classes for enabling the new type 
system. The most important part is the ValueDataTypeConverter. It basically 
allows converting literals to DataType. By looking into the actual value 
instead of just the class, it enabled supporting all sorts of time classes as 
well as Java's BigDecimal with variable precision and scale.
   
   It also contains one important change regarding string literals that improve 
SQL standard compliance.
   
   ## Brief change log
   
   See commit messages.
   
   
   ## Verifying this change
   
   See `ValueDataTypeConverterTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848987#comment-16848987
 ] 

Gary Yao commented on FLINK-12302:
--

[~lamber-ken] Thanks for your example. However, I am still not able to 
reproduce this (emr-5.23.0, Amazon Hadoop 2.8.5). I manually killed the AM 
after submission, and triggered another application attempt. Find attached the 
logs:
 [^jobmanager-05-27.log] 

Maybe you can attach logs as well? 

Submission command:
{code}
HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster -d -c 
comTestDemo ../testjob-1.0-SNAPSHOT.jar
{code}

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-27 Thread GitBox
1u0 commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287809858
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityListener.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface defining couple of essential methods for listening on data 
availability using
+ * {@link CompletableFuture}. For usage check out for example {@link 
AsyncDataInput}.
+ */
+@Internal
+public interface AvailabilityListener {
+   /**
+* Constant that allows to avoid volatile checks {@link 
CompletableFuture#isDone()}. Check
+* {@link #isAvailable()} for more explanation.
+*/
+   CompletableFuture AVAILABLE = 
CompletableFuture.completedFuture(null);
+
+   /**
+* @return true if is finished and for example end of input was 
reached, false otherwise.
+*/
+   boolean isFinished();
+
+   /**
+* Check if this instance is available for further processing.
+*
+* When hot looping to avoid volatile access in {@link 
CompletableFuture#isDone()} user of
+* this method should do the following check:
+* 
+* {@code
+*  AvailabilityListener input = ...;
+*  if (input.isAvailable() == AvailabilityListener.AVAILABLE || 
input.isAvailable().isDone()) {
+*  // do something;
+*  }
+* }
+* 
+*
+* @return a future that is completed if there are more records 
available. If there are more
+* records available immediately, {@link #AVAILABLE} should be 
returned. Previously returned
+* not completed futures should become completed once there is more 
input available or if
+* the input {@link #isFinished()}.
+*/
+   CompletableFuture isAvailable();
 
 Review comment:
   Maybe you can simplify the contract of this interface, by having only one 
method that returns 3 possible states of an underlying data stream? In 
nutshell, implementations of this interface have three situations:
* stream has finished;
* stream has not finished, and there is an immediate data ready to be 
processed;
* stream has not finished, but the caller needs to wait when data is ready.
   
   One way, you can apply the same trick as with `CompletableFuture 
AVAILABLE = CompletableFuture.completedFuture(null);`.
   Basically, having pre-defined different constant dummy `CompletableFuture` 
that would be marker of `isFinished`:
   
   ```
   public interface AvailabilityListener {
   CompletableFuture FINISHED = CompletableFuture.completedFuture(null);
   CompletableFuture AVAILABLE = CompletableFuture.completedFuture(null);
   
   CompletableFuture getStatusFuture();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12302) Fixed the wrong finalStatus of yarn application when application finished

2019-05-27 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-12302:
-
Attachment: jobmanager-05-27.log

> Fixed the wrong finalStatus of yarn application when application finished
> -
>
> Key: FLINK-12302
> URL: https://issues.apache.org/jira/browse/FLINK-12302
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, jobmanager-05-27.log, screenshot-1.png, 
> screenshot-2.png, spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-27 Thread GitBox
zentol commented on a change in pull request #8545: [FLINK-12520] Support to 
provide fully-qualified domain host name in TaskManagerMetricGroup
URL: https://github.com/apache/flink/pull/8545#discussion_r287806538
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
 ##
 @@ -185,6 +185,14 @@
"faster updating metrics. Increase this value 
if the metric fetcher causes too much load. Setting this value to 0 " +
"disables the metric fetching completely.");
 
+   /**
+* Whether the host name in task manager metrics should be fully 
qualified domain name.
+*/
+   public static final ConfigOption METRIC_FULL_HOST_NAME =
+   key("metrics.tm.full-hostname")
 
 Review comment:
   I'd rather introduce a new variable `>tm_fqdn<` then resorting to config 
options.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-27 Thread GitBox
zentol commented on a change in pull request #8545: [FLINK-12520] Support to 
provide fully-qualified domain host name in TaskManagerMetricGroup
URL: https://github.com/apache/flink/pull/8545#discussion_r287806538
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
 ##
 @@ -185,6 +185,14 @@
"faster updating metrics. Increase this value 
if the metric fetcher causes too much load. Setting this value to 0 " +
"disables the metric fetching completely.");
 
+   /**
+* Whether the host name in task manager metrics should be fully 
qualified domain name.
+*/
+   public static final ConfigOption METRIC_FULL_HOST_NAME =
+   key("metrics.tm.full-hostname")
 
 Review comment:
   I'd rather introduce a new variable `` then resorting to config 
options.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-27 Thread GitBox
zentol commented on a change in pull request #8545: [FLINK-12520] Support to 
provide fully-qualified domain host name in TaskManagerMetricGroup
URL: https://github.com/apache/flink/pull/8545#discussion_r287806224
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
 ##
 @@ -202,7 +214,11 @@ public static String getHostName(InetAddress inetAddress) 
{
LOG.warn("No hostname could be resolved for the IP 
address {}, using IP address as host name. "
+ "Local input split assignment (such as for 
HDFS files) may be impacted.", inetAddress.getHostAddress());
} else {
-   hostName = NetUtils.getHostnameFromFQDN(fqdnHostName);
+   if (useFullHostName) {
+   hostName = fqdnHostName;
 
 Review comment:
   this case should most certainly not be handled in this method. Have the 
TaskManagerRunner call `getFqdnHostName` instead (after making it public).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11283) Add keyed CoProcessFunction that allows accessing key

2019-05-27 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11283:
-
Issue Type: New Feature  (was: Improvement)

> Add keyed CoProcessFunction that allows accessing key
> -
>
> Key: FLINK-11283
> URL: https://issues.apache.org/jira/browse/FLINK-11283
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, we can access the key when using {{KeyedProcessFunction}} .
> Simillar functionality would be very useful when processing connected keyed 
> stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation

2019-05-27 Thread GitBox
pnowojski commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end 
tests to run with custom (input) File System implementation
URL: https://github.com/apache/flink/pull/8495#issuecomment-496224208
 
 
   LGTM % renaming, one question and if Chesney is fine with extending the 
testing time (I would keep the original count of tests) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8548: [FLINK-6962] [table] Add a create table SQL DDL

2019-05-27 Thread GitBox
twalthr commented on a change in pull request #8548: [FLINK-6962] [table] Add a 
create table SQL DDL
URL: https://github.com/apache/flink/pull/8548#discussion_r287803308
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -0,0 +1,347 @@
+<#--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to you under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+void TableColumn(TableCreationContext context) :
+{
+}
+{
+(
+TableColumn2(context.columnList)
+|
+context.watermark = Watermark()
+|
+context.primaryKeyList = PrimaryKey()
+|
+UniqueKey(context.uniqueKeysList)
+|
+ComputedColumn(context)
+)
+}
+
+void ComputedColumn(TableCreationContext context) :
+{
+SqlNode identifier;
+SqlNode expr;
+boolean hidden = false;
+SqlParserPos pos;
+}
+{
+identifier = SimpleIdentifier() {pos = getPos();}
+
+expr = Expression(ExprContext.ACCEPT_SUB_QUERY) {
+expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, 
expr).pos(), expr, identifier);
+context.columnList.add(expr);
+}
+}
+
+void TableColumn2(List list) :
+{
+SqlParserPos pos;
+SqlIdentifier name;
+SqlDataTypeSpec type;
+SqlCharStringLiteral comment = null;
+boolean isHeader = false;
+}
+{
+name = SimpleIdentifier()
+type = DataType()
+(
+ { type = type.withNullable(true); }
+|
+  { type = type.withNullable(false); }
+|
+{ type = type.withNullable(true); }
+)
+[  { isHeader = true; } ]
+[   {
+String p = SqlParserUtil.parseString(token.image);
+comment = SqlLiteral.createCharString(p, getPos());
+}]
+{
+SqlTableColumn tableColumn = new SqlTableColumn(name, type, comment, 
getPos());
+tableColumn.setHeader(isHeader);
+list.add(tableColumn);
+}
+}
+
+SqlNodeList PrimaryKey() :
+{
+List pkList = new ArrayList();
+
+SqlParserPos pos;
+SqlIdentifier columnName;
+}
+{
+ { pos = getPos(); }  
+columnName = SimpleIdentifier() { pkList.add(columnName); }
+( columnName = SimpleIdentifier() { pkList.add(columnName); })*
+
+{
+return new SqlNodeList(pkList, pos.plus(getPos()));
+}
+}
+
+void UniqueKey(List list) :
+{
+List ukList = new ArrayList();
+SqlParserPos pos;
+SqlIdentifier columnName;
+}
+{
+ { pos = getPos(); } 
+columnName = SimpleIdentifier() { ukList.add(columnName); }
+( columnName = SimpleIdentifier() { ukList.add(columnName); })*
+
+{
+SqlNodeList uk = new SqlNodeList(ukList, pos.plus(getPos()));
+list.add(uk);
+}
+}
+
+SqlWatermark Watermark() :
+{
+SqlIdentifier watermarkName = null;
+SqlIdentifier columnName;
+SqlWatermarkStrategy strategy;
+int delayDef = -1;
+SqlTimeUnit timeUnit = null;
+}
+{
+
+[watermarkName = SimpleIdentifier()]
+
+columnName = SimpleIdentifier()
+
+(
+  
+{
+strategy = SqlWatermarkStrategy.BOUNDED_WITH_DELAY;
+delayDef = UnsignedIntLiteral();
+}
+(
+ { timeUnit = SqlTimeUnit.DAY; }
+|{ timeUnit = SqlTimeUnit.HOUR; }
+|{ timeUnit = SqlTimeUnit.MINUTE; }
+|{ timeUnit = SqlTimeUnit.SECOND; }
+|{ timeUnit = SqlTimeUnit.MILLISECOND; }
+)
+|{ strategy = SqlWatermarkStrategy.ASCENDING; }
+|{ strategy = SqlWatermarkStrategy.FROM_SOURCE; }
+)
+{
+return new SqlWatermark(
+watermarkName,
+columnName,
+strategy,
+delayDef,
+timeUnit,
+getPos());
+}
+}
+
+SqlNode PropertyValue() :
+{
+SqlIdentifier key;
+SqlNode value;
+SqlParserPos pos;
+}
+{
+key = CompoundIdentifier()
+{ pos = getPos(); }
+ value = StringLiteral()
+{
+return new SqlProperty(key, value, getPos());
+}
+}
+
+SqlNode SqlCreateTable() :
+{
+final SqlParserPos startPos;
+SqlIdentifier tableName;
+String tableType = null;
+SqlNodeList primaryKeyList = null;
+List uniqueKeysList = null;
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation

2019-05-27 Thread GitBox
pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] 
Extend some end-to-end tests to run with custom (input) File System 
implementation
URL: https://github.com/apache/flink/pull/8495#discussion_r287803156
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
 ##
 @@ -92,34 +88,28 @@ do
 sleep 2
 done
 
-CLUSTER_STARTED=1
-for (( i = 0; i < $CLUSTER_SETUP_RETRIES; i++ ))
-do
-if start_hadoop_cluster; then
-   echo "Cluster started successfully."
-   CLUSTER_STARTED=0
-   break #continue test, cluster set up succeeded
-fi
-
-echo "ERROR: Could not start hadoop cluster. Retrying..."
-docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml 
down
-done
-
-if [[ ${CLUSTER_STARTED} -ne 0 ]]; then
+if ! retry_times $CLUSTER_SETUP_RETRIES 0 start_hadoop_cluster; then
 echo "ERROR: Could not start hadoop cluster. Aborting..."
 exit 1
 fi
 
+mkdir -p $FLINK_TARBALL_DIR
+tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
 
 Review comment:
   > I wasn't (and I'm still not) sure if all of those refactorings in the 
first commit are related to one another or if they are 3 independent things 
   
   Bumping the question :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation

2019-05-27 Thread GitBox
pnowojski commented on a change in pull request #8495: [FLINK-12556][e2e] 
Extend some end-to-end tests to run with custom (input) File System 
implementation
URL: https://github.com/apache/flink/pull/8495#discussion_r287802609
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-plugins-test/src/main/java/org/apache/flink/fs/dummy/DummyFileSystemFileStatus.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.dummy;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+class DummyFileSystemFileStatus implements FileStatus {
 
 Review comment:
   Ok I get your point. In that case I would vote for one of the following:
   1. `DummyFileSystem`, `DumyFileStatus`, `DummyDataInputStream`
   2. `DummyFSFileSystem` `DumyFSFileStatus`, `DummyFSDataInputStream` (note 
`FS` which would also feet the code base better compared to `Fs` and note that 
all files should have the same prefix)
   3. Rename it altogether, for example to `StaticContent` or 
`PredefinedContent` (`StaticContentFileSystem`, `StaticContentFileStatus`, 
`StaticContentDataInputStream`, ...)
   
   I think the 3rd would be the best, as it also describes in what way the file 
system is "dummy", not just the fact that it is "dummy". 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287802261
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -376,39 +391,57 @@ public StateTtlConfig build() {
private static final long serialVersionUID = 
1373998465131443873L;
}
 
-   final EnumMap strategies = new 
EnumMap<>(Strategies.class);
+   private final EnumMap strategies = 
new EnumMap<>(Strategies.class);
 
-   public void activate(Strategies strategy) {
+   private void activate(Strategies strategy) {
activate(strategy, EMPTY_STRATEGY);
}
 
-   public void activate(Strategies strategy, CleanupStrategy 
config) {
+   private void activate(Strategies strategy, CleanupStrategy 
config) {
strategies.put(strategy, config);
}
 
public boolean inFullSnapshot() {
return 
strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
 
 Review comment:
   true, I thought about another method, sorry for confusion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12611) Make time indicator nullable in blink

2019-05-27 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848956#comment-16848956
 ] 

Timo Walther commented on FLINK-12611:
--

Running aggregates like {{SELECT MAX(X)}} should not propagate time attributes. 
Currently, we strictly separate time-based operations and materializing 
operations. This is also discussed 
[here|https://github.com/ververica/sql-training/blob/master/slides/sql-training-02-querying-dynamic-tables.pdf]
 (slide 24 and 32). So the concept of time attributes and retract/upsert tables 
should be orthogonal.

However, until the time attribute materialization happens, a time attribute 
should behave like a regular TIMESTAMP and thus can be nullable.

> Make time indicator nullable in blink
> -
>
> Key: FLINK-12611
> URL: https://issues.apache.org/jira/browse/FLINK-12611
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> SQL: select max(rowtime), count(a) from T
> There will be a AssertionError: type mismatch:
> aggCall type:
> TIMESTAMP(3) NOT NULL
> inferred type:
> TIMESTAMP(3)
> Agg type checking is done before TimeIndicator materializes. So there is a 
> exception.
> And before introducing nullable of LogicalType, we should modify this to 
> avoid more potential TypeCheck problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha closed pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-27 Thread GitBox
aljoscha closed pull request #7470: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/7470
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #7470: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-27 Thread GitBox
aljoscha commented on issue #7470: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/7470#issuecomment-496221694
 
 
   Thanks a lot for staying on this issue for long, @yanghua. I merged it 
finally.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-27 Thread GitBox
aljoscha commented on issue #8538: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/8538#issuecomment-496221456
 
 
   Merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11283) Add keyed CoProcessFunction that allows accessing key

2019-05-27 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-11283.

   Resolution: Implemented
Fix Version/s: 1.9.0

Implemented on master in
3a5bf89384ed07431d15285ef40e751daf9d0c83

> Add keyed CoProcessFunction that allows accessing key
> -
>
> Key: FLINK-11283
> URL: https://issues.apache.org/jira/browse/FLINK-11283
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, we can access the key when using {{KeyedProcessFunction}} .
> Simillar functionality would be very useful when processing connected keyed 
> stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests to run with custom (input) File System implementation

2019-05-27 Thread GitBox
1u0 commented on issue #8495: [FLINK-12556][e2e] Extend some end-to-end tests 
to run with custom (input) File System implementation
URL: https://github.com/apache/flink/pull/8495#issuecomment-496221499
 
 
   Timings of the tests, from the CI logs:
   ```
   [PASS] 'Running Kerberized YARN on Docker test (default input)' passed after 
8 minutes and 13 seconds! Test exited with exit code 0.
   [PASS] 'Running Kerberized YARN on Docker test (custom fs plugin)' passed 
after 4 minutes and 33 seconds! Test exited with exit code 0.
   
   [PASS] 'Wordcount end-to-end test' passed after 0 minutes and 12 seconds! 
Test exited with exit code 0.
   [PASS] 'Shaded Hadoop S3A end-to-end test' passed after 0 minutes and 1 
seconds! Test exited with exit code 0.
   [PASS] 'Shaded Presto S3 end-to-end test' passed after 0 minutes and 0 
seconds! Test exited with exit code 0.
   [PASS] 'Custom FS plugin end-to-end test' passed after 0 minutes and 13 
seconds! Test exited with exit code 0.
   ```
   
   So adding a modified Yarn kerberos test as addition, extends tests run by 
4,5 minutes. He it's may look faster, because it reuses the docker build 
created by previous test run. Otherwise, I expect them to run about the same 
amount of time in equal conditions.
   
   CC @zentol.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-05-27 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11373:
-
Component/s: (was: Deployment / Scripts)
 Command Line Client

> CliFrontend cuts off reason for error messages
> --
>
> Key: FLINK-11373
> URL: https://issues.apache.org/jira/browse/FLINK-11373
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Maximilian Michels
>Assignee: leesf
>Priority: Minor
>  Labels: pull-request-available, starter
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The CliFrontend seems to only print the first message in the strace trace and 
> not any of its causes.
> {noformat}
> bin/flink run /non-existing/path
> Could not build the program from JAR file.
> Use the help option (-h or --help) to get help on the command.
> {noformat}
> Notice, the underlying cause of this message is FileNotFoundException.
> Consider changing 
> a) the error message for this particular case 
> b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha closed pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-27 Thread GitBox
aljoscha closed pull request #8538: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/8538
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287799750
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
 ##
 @@ -18,48 +18,64 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor;
 
+import javax.annotation.Nonnull;
 import java.io.Serializable;
 
 /**
  * Contains information where to find a partition. The partition is defined by 
the
- * {@link IntermediateDataSetID} and the partition location is specified by
- * {@link InputChannelDeploymentDescriptor}.
+ * {@link IntermediateDataSetID} and the partition is specified by
+ * {@link org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor}.
  */
 public class PartitionInfo implements Serializable {
 
private static final long serialVersionUID = 1724490660830968430L;
 
+   @Nonnull
private final IntermediateDataSetID intermediateDataSetID;
 
 Review comment:
   At the moment, `IntermediateDataSetID` is kept once in 
`InputGateDeploymentDescriptor` for all channels/partitions as before. I 
suggest we keep it this way in this PR. Later we can consider one step further 
refactoring where `IntermediateDataSetID` is part of the full partition id.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord

2019-05-27 Thread GitBox
aljoscha commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema 
that uses ProducerRecord
URL: https://github.com/apache/flink/pull/8535#issuecomment-496219121
 
 
   Also, @tweise or @jgrier is this feature relevant for you? Maybe you also 
have some comments/input?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8510: [FLINK-12254][table] Update cast() and TypeLiteralExpression to new type system

2019-05-27 Thread GitBox
asfgit closed pull request #8510:  [FLINK-12254][table] Update cast() and 
TypeLiteralExpression to new type system
URL: https://github.com/apache/flink/pull/8510
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API

2019-05-27 Thread GitBox
zentol commented on issue #8485: [FLINK-12555] Introduce an encapsulated metric 
group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#issuecomment-496213298
 
 
   > And if we think there are no concerns for creating one metric group twice
   
   There are concerns; it prints a warning to the user as it is not how the API 
is supposed to be used. If any components need access to a shared group then 
this group should be created once and passed around as needed.
   Things were implemented this way on purpose to prevent components from 
interfering with each other by accident. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287788958
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Result containing the tasks to restart upon a task failure.
+ * Also contains the reason if the failure is not recoverable(non-recoverable
+ * failure type or restarting suppressed by restart strategy).
+ */
+public class FailureHandlingResult {
+
+   /** Task vertices to restart to recover from the failure. */
+   private final Set verticesToRestart;
+
+   /** Delay before the restarting can be conducted. */
+   private final long restartDelayMS;
+
+   /** Reason why the failure is not recoverable. */
+   private final Throwable error;
+
+   /**
+* Creates a result of a set of tasks to restart to recover from the 
failure.
+*
+* @param verticesToRestart containing task vertices to restart to 
recover from the failure
+* @param restartDelayMS indicate a delay before conducting the restart
+*/
+   private FailureHandlingResult(Set verticesToRestart, 
long restartDelayMS) {
+   checkState(restartDelayMS >= 0);
+
+   this.verticesToRestart = 
Collections.unmodifiableSet(checkNotNull(verticesToRestart));
+   this.restartDelayMS = restartDelayMS;
+   this.error = null;
+   }
+
+   /**
+* Creates a result that the failure is not recoverable and no 
restarting should be conducted.
+*
+* @param error reason why the failure is not recoverable
+*/
+   private FailureHandlingResult(Throwable error) {
+   this.verticesToRestart = null;
+   this.restartDelayMS = -1;
+   this.error = checkNotNull(error);
+   }
+
+   /**
+* Returns the tasks to restart.
+*
+* @return the tasks to restart
+*/
+   public Set getVerticesToRestart() {
+   if (canRestart()) {
+   return verticesToRestart;
+   } else {
+   throw new IllegalStateException("Cannot get vertices to 
restart when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns the delay before the restarting.
+*
+* @return the delay before the restarting
+*/
+   public long getRestartDelayMS() {
+   if (canRestart()) {
+   return restartDelayMS;
+   } else {
+   throw new IllegalStateException("Cannot get restart 
delay when the restarting is suppressed.");
+   }
+   }
+
+   /**
+* Returns whether the restarting can be conducted.
+*
+* @return whether the restarting can be conducted
+*/
+   public boolean canRestart() {
+   return error == null;
+   }
+
+   /**
+* Returns reason why the restarting cannot be conducted.
+*
+* @return reason why the restarting cannot be conducted
+*/
+   public Throwable getError() {
+   return error;
 
 Review comment:
   shouldn't this also throw an `IllegalStateException` if `error == null` like 
`getRestartDelayMS` and `getVerticesToRestart`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] Implement ExecutionFailureHandler

2019-05-27 Thread GitBox
zentol commented on a change in pull request #8498: [FLINK-12413] [runtime] 
Implement ExecutionFailureHandler
URL: https://github.com/apache/flink/pull/8498#discussion_r287786958
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link FailureHandlingResult}.
+ */
+public class FailureHandlingResultTest extends TestLogger {
+
+   /**
+* Tests normal FailureHandlingResult.
+*/
+   @Test
+   public void testNormalFailureHandlingResult() throws Exception {
+   // create a normal FailureHandlingResult
+   Set tasks = new HashSet<>();
 
 Review comment:
   You missed this one :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >