[GitHub] [flink] dianfu commented on a change in pull request #13415: [FLINK-19277][python] Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread GitBox


dianfu commented on a change in pull request #13415:
URL: https://github.com/apache/flink/pull/13415#discussion_r49076



##
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.JoinedRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.grouping.HeapWindowsGrouping;
+import 
org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping;
+import org.apache.flink.table.runtime.util.RowIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+
+/**
+ * The Batch Arrow Python {@link AggregateFunction} Operator for Group Window 
Aggregation.
+ */
+@Internal
+public class BatchArrowPythonGroupWindowAggregateFunctionOperator
+   extends AbstractBatchArrowPythonAggregateFunctionOperator {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* The Infos of the Window.
+* 0 -> start of the Window.
+* 1 -> end of the Window.
+* 2 -> row time of the Window.
+*/
+   private final int[] namedProperties;
+
+   /**
+* The row time index of the input data.
+*/
+   private final int inputTimeFieldIndex;
+
+   /**
+* The window elements buffer size limit used in group window agg 
operator.
+*/
+   private final int maxLimitSize;
+
+   /**
+* The window size of the window.
+*/
+   private final long windowSize;
+
+   /**
+* The sliding size of the sliding window.
+*/
+   private final long slideSize;
+
+   private transient WindowsGrouping windowsGrouping;
+
+   /**
+* The GenericRowData reused holding the property of the window, such 
as window start, window
+* end and window time.
+*/
+   private transient GenericRowData windowProperty;
+
+   /**
+* The JoinedRowData reused holding the window agg execution result.
+*/
+   private transient JoinedRowData windowAggResult;
+
+   /**
+* The queue holding the input groupSet with the TimeWindow for which 
the execution results
+* have not been received.
+*/
+   private transient LinkedList> 
inputKeyAndWindow;
+
+   public BatchArrowPythonGroupWindowAggregateFunctionOperator(
+   Configuration config,
+   PythonFunctionInfo[] pandasAggFunctions,
+   RowType inputType,
+   RowType outputType,
+   int inputTimeFieldIndex,
+   int maxLimitSize,
+   long windowSize,
+   long slideSize,
+   int[] namedProperties,
+   int[] groupKey,
+   int[] groupingSet,
+   int[] udafInputOffsets) {
+   super(config, pandasAggFunctions, inputType, outputType, 
groupKey, groupingSet, udafInputOffsets);
+   this.namedProperties = namedProperties;
+   this.inputTimeFieldIndex = inputTimeFieldIndex;
+   this.maxLimitSize = maxLimitSize;
+   this.windowSize = windowSize;
+   this.slideSize = slideSize;
+   }
+
+   @Override
+   public void open() throws Exception {
+   userDefinedFunctionOutputType = new RowType(
+   outputType.getFields().subList(groupingSet.length, 
outputType.getFieldCount() - 

[jira] [Commented] (FLINK-19281) LIKE cannot recognize full table path

2020-09-17 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198129#comment-17198129
 ] 

Benchao Li commented on FLINK-19281:


[~ZhuShang] yes, you're right. assigned to you.

> LIKE cannot recognize full table path
> -
>
> Key: FLINK-19281
> URL: https://issues.apache.org/jira/browse/FLINK-19281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: Benchao Li
>Priority: Major
>
> for example, if we have a table whose full path is 
> {{default_catalog.default_database.my_table1}}, and the following DDL will 
> fail
> {code:sql}
> create table my_table2 
> like default_catalog.default_database.my_table1
> {code}
> it will throw
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Source table 
> '`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
>  of the LIKE clause not found in the catalog, at line 11, column 6
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:83)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:188)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.convertSqlNodeToOperation(ParserImpl.java:92)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlInternal(TableEnvironmentImpl.java:724)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sql(TableEnvironmentImpl.java:704)
> {code}
> We can fix it in {{SqlCreateTableConverter#lookupLikeSourceTable}}, via using 
> `SqlTableLike`'s full name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19281) LIKE cannot recognize full table path

2020-09-17 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li reassigned FLINK-19281:
--

Assignee: CloseRiver

> LIKE cannot recognize full table path
> -
>
> Key: FLINK-19281
> URL: https://issues.apache.org/jira/browse/FLINK-19281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: Benchao Li
>Assignee: CloseRiver
>Priority: Major
>
> for example, if we have a table whose full path is 
> {{default_catalog.default_database.my_table1}}, and the following DDL will 
> fail
> {code:sql}
> create table my_table2 
> like default_catalog.default_database.my_table1
> {code}
> it will throw
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Source table 
> '`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
>  of the LIKE clause not found in the catalog, at line 11, column 6
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:83)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:188)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.convertSqlNodeToOperation(ParserImpl.java:92)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlInternal(TableEnvironmentImpl.java:724)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sql(TableEnvironmentImpl.java:704)
> {code}
> We can fix it in {{SqlCreateTableConverter#lookupLikeSourceTable}}, via using 
> `SqlTableLike`'s full name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13416: [FLINK-19179] Extend managed memory fraction calculation for various use cases.

2020-09-17 Thread GitBox


flinkbot commented on pull request #13416:
URL: https://github.com/apache/flink/pull/13416#issuecomment-694667551


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit b1405ce3314c2799cd8172b50b4653389f36eca9 (Fri Sep 18 
05:48:18 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong closed pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-17 Thread GitBox


xintongsong closed pull request #13397:
URL: https://github.com/apache/flink/pull/13397


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-19286) Improve pipelined region scheduling performance

2020-09-17 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19286:
---

 Summary: Improve pipelined region scheduling performance
 Key: FLINK-19286
 URL: https://issues.apache.org/jira/browse/FLINK-19286
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


In my recent TPCDS benchmark, pipelined region scheduling is slower than 
lazy-from-sources scheduling. 
The regression is due to some suboptimal implementation of 
{{PipelinedRegionSchedulingStrategy}}, including:
1. topologically sorting of vertices to deploy
2. unnecessary O(V) loop when sorting an empty set of regions

After improving these implementations, pipelined region scheduling turned to be 
10% faster in the previous benchmark setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-17 Thread GitBox


xintongsong commented on pull request #13397:
URL: https://github.com/apache/flink/pull/13397#issuecomment-694667340


   Offline discussed with @KarmaGYZ, we decided to combine the changes of 
FLINK-19178 & FLINK-19179 in #13416, to improve readability and avoid leaving 
intermediate commits in git history.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on pull request #13416: [FLINK-19179] Extend managed memory fraction calculation for various use cases.

2020-09-17 Thread GitBox


xintongsong commented on pull request #13416:
URL: https://github.com/apache/flink/pull/13416#issuecomment-694666993


   cc @KarmaGYZ @azagrebin 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19179) Implement the managed memory fraction calculation logic

2020-09-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19179:
---
Labels: pull-request-available  (was: )

> Implement the managed memory fraction calculation logic
> ---
>
> Key: FLINK-19179
> URL: https://issues.apache.org/jira/browse/FLINK-19179
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> This also means migrating the batch operator use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong opened a new pull request #13416: [FLINK-19179] Extend managed memory fraction calculation for various use cases.

2020-09-17 Thread GitBox


xintongsong opened a new pull request #13416:
URL: https://github.com/apache/flink/pull/13416


   ## What is the purpose of the change
   
   This PR contains changes for both FLINK-19178 & FLINK-19179. Extends the 
managed memory weight/fraction configurations and settings with respect to 
multiple use cases.
   
   ## Brief change log
   
   - 7e42981c8e71dec5749720e0d32b96dbe66bde52: Disable managed memory fractions 
for fine grained resource specs. Currently we do not have fine grained resource 
specs in production, and the original calculation logic for it does not work 
with multiple use cases. We can enable it with a proper calculation logic later 
when we need it.
   - 3387a0e1f5bc577e90e101fe12482960ea435a7e: Introduce configuration options 
for managed memory use case weights.
   - 22b5a92fd720a91324d91a5a198f906c50b76fc1: Extend `Transformation` 
interfaces for various managed memory use cases.
   - 36c563e1caedf95f53666fda82a03e9030a96c0b: Extend `StreamNode` interfaces 
for various managed memory use cases.
   - 33fb6d84e06f6e04df86c9bc3d1f24b695c206d6: Introduce `ManagedMemoryUtils`.
   - 9211bcee3b433bb857c5917a545255bfc8368a26: Extend `StreamConfig` interfaces 
for various managed memory use cases.
   - b1405ce3314c2799cd8172b50b4653389f36eca9: Implement new fraction 
calculation logics.
   
   ## Verifying this change
   
   - Add `ManagedMemoryUtilsTest`
   - Update `StreamingJobGraphGeneratorTest`
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19281) LIKE cannot recognize full table path

2020-09-17 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-19281:
--
Description: 
for example, if we have a table whose full path is 
{{default_catalog.default_database.my_table1}}, and the following DDL will fail
{code:sql}
create table my_table2 
like default_catalog.default_database.my_table1
{code}
it will throw
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Source table 
'`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
 of the LIKE clause not found in the catalog, at line 11, column 6
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:83)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:188)
at 
org.apache.flink.table.planner.delegation.ParserImpl.convertSqlNodeToOperation(ParserImpl.java:92)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlInternal(TableEnvironmentImpl.java:724)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sql(TableEnvironmentImpl.java:704)
{code}
We can fix it in {{SqlCreateTableConverter#lookupLikeSourceTable}}, via using 
`SqlTableLike`'s full name.

  was:
for example, if we have a table whose full path is 
{{default_catalog.default_database.my_table1}}, and the following DDL will fail
{code:SQL}
create table my_table2 
like default_catalog.default_database.my_table1
{code}
it will throw
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Source table 
'`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
 of the LIKE clause not found in the catalog, at line 11, column 6
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
at 
org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:83)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:188)
at 
org.apache.flink.table.planner.delegation.ParserImpl.convertSqlNodeToOperation(ParserImpl.java:92)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlInternal(TableEnvironmentImpl.java:724)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sql(TableEnvironmentImpl.java:704)
{code}

We can fix it in {{SqlCreateTableConverter#lookupLikeSourceTable}}, via using 
`SqlTableLike`'s full name.



> LIKE cannot recognize full table path
> -
>
> Key: FLINK-19281
> URL: https://issues.apache.org/jira/browse/FLINK-19281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: Benchao Li
>Priority: Major
>
> for example, if we have a table whose full path is 
> {{default_catalog.default_database.my_table1}}, and the following DDL will 
> fail
> {code:sql}
> create table my_table2 
> like default_catalog.default_database.my_table1
> {code}
> it will throw
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Source table 
> '`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
>  of the LIKE clause not found in the catalog, at line 11, column 6
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
>   at 
> 

[jira] [Commented] (FLINK-19280) The option "sink.buffer-flush.max-rows" for JDBC can't be disabled by set to zero

2020-09-17 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198123#comment-17198123
 ] 

dalongliu commented on FLINK-19280:
---

I want to try fix it, can you assigne it to me.

> The option "sink.buffer-flush.max-rows" for JDBC can't be disabled by set to 
> zero
> -
>
> Key: FLINK-19280
> URL: https://issues.apache.org/jira/browse/FLINK-19280
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Affects Versions: 1.11.2
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.12.0, 1.11.3
>
>
> According to the documentation, when set "sink.buffer-flush.max-rows=0", it 
> should be disabled to flush buffered data by number of rows, however, it 
> still flushes when received each row. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19285) Kafka sql connector fixed partitioner not work

2020-09-17 Thread limbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198121#comment-17198121
 ] 

limbo commented on FLINK-19285:
---

I tracked the code and find that

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L933]

the default AT_LEASTE_ONCE not  call the partitioner open function, so the 
parallelInstanceId set default 0

> Kafka sql connector fixed partitioner not work
> --
>
> Key: FLINK-19285
> URL: https://issues.apache.org/jira/browse/FLINK-19285
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.2
> Environment: CREATE TABLE kafkaTable (
>  col1 BIGINT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> );
> CREATE TABLE kafkaTable1 (
>  col1 BIGINT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test1',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'csv',
>  'sink.partitioner'='fixed'
> );
> insert into kafkaTable1 select col1 from kafkaTable
>Reporter: limbo
>Priority: Critical
>
> Hi, when I use sql kafka connector sink and set 'sink.partitioner'='fixed', 
> the partitioner not work, it only produce to the 0 partition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13415: [FLINK-19277][python] Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13415:
URL: https://github.com/apache/flink/pull/13415#issuecomment-694656988


   
   ## CI report:
   
   * 4a182a499c3867765b6ef3c3957fbe4c63c2dce1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6625)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-18114) Expose more details in logs for debugging bulk slot allocation failures

2020-09-17 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-18114.
---
Resolution: Won't Do

> Expose more details in logs for debugging bulk slot allocation failures
> ---
>
> Key: FLINK-18114
> URL: https://issues.apache.org/jira/browse/FLINK-18114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.12.0
>
>
> More detailed logs are needed for users to troubleshoot bulk slot allocation 
> failures.
> https://github.com/apache/flink/pull/12375#discussion_r433922167



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19285) Kafka sql connector fixed partitioner not work

2020-09-17 Thread limbo (Jira)
limbo created FLINK-19285:
-

 Summary: Kafka sql connector fixed partitioner not work
 Key: FLINK-19285
 URL: https://issues.apache.org/jira/browse/FLINK-19285
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.11.2
 Environment: CREATE TABLE kafkaTable (
 col1 BIGINT

) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE kafkaTable1 (
 col1 BIGINT
) WITH (
 'connector' = 'kafka',
 'topic' = 'test1',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'csv',
 'sink.partitioner'='fixed'
);


insert into kafkaTable1 select col1 from kafkaTable
Reporter: limbo


Hi, when I use sql kafka connector sink and set 'sink.partitioner'='fixed', the 
partitioner not work, it only produce to the 0 partition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18333) UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor Program"

2020-09-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-18333.
---
Resolution: Fixed

Thanks [~Leonard Xu] [~jark] for the fix and review. This issue doesn't occur 
for more than one week. I'm closing this ticket!

> UnsignedTypeConversionITCase failed caused by MariaDB4j "Asked to waitFor 
> Program"
> --
>
> Key: FLINK-18333
> URL: https://issues.apache.org/jira/browse/FLINK-18333
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8173=logs=66592496-52df-56bb-d03e-37509e1d9d0f=ae0269db-6796-5583-2e5f-d84757d711aa
> {code}
> 2020-06-16T08:23:26.3013987Z [INFO] Running 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase
> 2020-06-16T08:23:30.2252334Z Tue Jun 16 08:23:30 UTC 2020 Thread[main,5,main] 
> java.lang.NoSuchFieldException: DEV_NULL
> 2020-06-16T08:23:31.2907920Z 
> 
> 2020-06-16T08:23:31.2913806Z Tue Jun 16 08:23:30 UTC 2020:
> 2020-06-16T08:23:31.2914839Z Booting Derby version The Apache Software 
> Foundation - Apache Derby - 10.14.2.0 - (1828579): instance 
> a816c00e-0172-bc39-e4b1-0e4ce818 
> 2020-06-16T08:23:31.2915845Z on database directory 
> memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class 
> loader sun.misc.Launcher$AppClassLoader@677327b6 
> 2020-06-16T08:23:31.2916637Z Loaded from 
> file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar
> 2020-06-16T08:23:31.2916968Z java.vendor=Private Build
> 2020-06-16T08:23:31.2917461Z 
> java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08
> 2020-06-16T08:23:31.2922200Z 
> user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target
> 2020-06-16T08:23:31.2922516Z os.name=Linux
> 2020-06-16T08:23:31.2922709Z os.arch=amd64
> 2020-06-16T08:23:31.2923086Z os.version=4.15.0-1083-azure
> 2020-06-16T08:23:31.2923316Z derby.system.home=null
> 2020-06-16T08:23:31.2923616Z 
> derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL
> 2020-06-16T08:23:31.2924790Z Database Class Loader started - 
> derby.database.classpath=''
> 2020-06-16T08:23:37.4354243Z [INFO] Tests run: 2, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 11.133 s - in 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase
> 2020-06-16T08:23:38.1880075Z [INFO] Running 
> org.apache.flink.connector.jdbc.table.JdbcTableSourceITCase
> 2020-06-16T08:23:41.3718038Z Tue Jun 16 08:23:41 UTC 2020 Thread[main,5,main] 
> java.lang.NoSuchFieldException: DEV_NULL
> 2020-06-16T08:23:41.4383244Z 
> 
> 2020-06-16T08:23:41.4401761Z Tue Jun 16 08:23:41 UTC 2020:
> 2020-06-16T08:23:41.4402797Z Booting Derby version The Apache Software 
> Foundation - Apache Derby - 10.14.2.0 - (1828579): instance 
> a816c00e-0172-bc3a-103b-0e4b0610 
> 2020-06-16T08:23:41.4403758Z on database directory 
> memory:/__w/1/s/flink-connectors/flink-connector-jdbc/target/test with class 
> loader sun.misc.Launcher$AppClassLoader@677327b6 
> 2020-06-16T08:23:41.4404581Z Loaded from 
> file:/__w/1/.m2/repository/org/apache/derby/derby/10.14.2.0/derby-10.14.2.0.jar
> 2020-06-16T08:23:41.4404945Z java.vendor=Private Build
> 2020-06-16T08:23:41.4405497Z 
> java.runtime.version=1.8.0_242-8u242-b08-0ubuntu3~16.04-b08
> 2020-06-16T08:23:41.4406048Z 
> user.dir=/__w/1/s/flink-connectors/flink-connector-jdbc/target
> 2020-06-16T08:23:41.4406303Z os.name=Linux
> 2020-06-16T08:23:41.4406494Z os.arch=amd64
> 2020-06-16T08:23:41.4406878Z os.version=4.15.0-1083-azure
> 2020-06-16T08:23:41.4407097Z derby.system.home=null
> 2020-06-16T08:23:41.4407415Z 
> derby.stream.error.field=org.apache.flink.connector.jdbc.JdbcTestBase.DEV_NULL
> 2020-06-16T08:23:41.5287219Z Database Class Loader started - 
> derby.database.classpath=''
> 2020-06-16T08:23:46.4567063Z [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 23.729 s <<< FAILURE! - in 
> org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase
> 2020-06-16T08:23:46.4575785Z [ERROR] 
> org.apache.flink.connector.jdbc.table.UnsignedTypeConversionITCase  Time 
> elapsed: 23.729 s  <<< ERROR!
> 2020-06-16T08:23:46.4576490Z ch.vorburger.exec.ManagedProcessException: An 
> error occurred while running a command: create database if not exists `test`;
> 2020-06-16T08:23:46.4577193Z  at ch.vorburger.mariadb4j.DB.run(DB.java:300)
> 

[GitHub] [flink] flinkbot commented on pull request #13415: [FLINK-19277][python] Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread GitBox


flinkbot commented on pull request #13415:
URL: https://github.com/apache/flink/pull/13415#issuecomment-694656988


   
   ## CI report:
   
   * 4a182a499c3867765b6ef3c3957fbe4c63c2dce1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13415: [FLINK-19277][python] Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread GitBox


flinkbot commented on pull request #13415:
URL: https://github.com/apache/flink/pull/13415#issuecomment-694653386


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 4a182a499c3867765b6ef3c3957fbe4c63c2dce1 (Fri Sep 18 
05:07:12 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19277) Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19277:
---
Labels: pull-request-available  (was: )

> Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator
> --
>
> Key: FLINK-19277
> URL: https://issues.apache.org/jira/browse/FLINK-19277
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator to support 
> Pandas Batch Group Window Aggregation



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] HuangXingBo opened a new pull request #13415: [FLINK-19277][python] Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread GitBox


HuangXingBo opened a new pull request #13415:
URL: https://github.com/apache/flink/pull/13415


   ## What is the purpose of the change
   
   *This pull request will introduce 
BatchArrowPythonGroupWindowAggregateFunctionOperator for supporting Pandas 
Batch Group Window Aggregation*
   
   
   ## Brief change log
   
 - *add BatchArrowPythonGroupWindowAggregateFunctionOperator*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *BatchArrowPythonGroupWindowAggregateFunctionOperatorTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-19277) Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator

2020-09-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-19277:
---

Assignee: Huang Xingbo

> Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator
> --
>
> Key: FLINK-19277
> URL: https://issues.apache.org/jira/browse/FLINK-19277
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.12.0
>
>
> Introduce BatchArrowPythonGroupWindowAggregateFunctionOperator to support 
> Pandas Batch Group Window Aggregation



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13414:
URL: https://github.com/apache/flink/pull/13414#issuecomment-694646569


   
   ## CI report:
   
   * 5b3479227664c09f527c045ab6d3623790fe3968 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6623)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu closed pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


dianfu closed pull request #13388:
URL: https://github.com/apache/flink/pull/13388


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19184) Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-19184.
---
Resolution: Fixed

Merged to master(1.12) via da86aee1c36dbb7530ab750e1c3634f32b2479c7

> Add Batch Physical Pandas Group Aggregate Rule and RelNode
> --
>
> Key: FLINK-19184
> URL: https://issues.apache.org/jira/browse/FLINK-19184
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Add Batch Physical Pandas Group Aggregate Rule and RelNode



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-17 Thread GitBox


flinkbot commented on pull request #13414:
URL: https://github.com/apache/flink/pull/13414#issuecomment-694646569


   
   ## CI report:
   
   * 5b3479227664c09f527c045ab6d3623790fe3968 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-694253946


   
   ## CI report:
   
   * 85d80667243bb98025bc5aa52565f4e6b57d468a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6622)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6610)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-17 Thread GitBox


JingsongLi commented on a change in pull request #13414:
URL: https://github.com/apache/flink/pull/13414#discussion_r490696171



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##
@@ -187,8 +187,8 @@ public void registerCatalog(String catalogName, Catalog 
catalog) {
throw new CatalogException(format("Catalog %s already 
exists.", catalogName));
}
 
-   catalogs.put(catalogName, catalog);
catalog.open();
+   catalogs.put(catalogName, catalog);

Review comment:
   Can you add a case for this?
   You can create a Catalog extends `GenericInMemoryCatalog` and implement 
`open` to throw exception. The check the TableEnv should not contains this 
catalog.

##
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##
@@ -223,6 +223,9 @@ public String getHiveVersion() {
public void open() throws CatalogException {
if (client == null) {
client = HiveMetastoreClientFactory.create(hiveConf, 
hiveVersion);
+   if (client == null) {

Review comment:
   This should not happen.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-694253946


   
   ## CI report:
   
   * 85d80667243bb98025bc5aa52565f4e6b57d468a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6610)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6622)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-17 Thread GitBox


flinkbot commented on pull request #13414:
URL: https://github.com/apache/flink/pull/13414#issuecomment-694640971


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 5b3479227664c09f527c045ab6d3623790fe3968 (Fri Sep 18 
04:19:53 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19227) The catalog is still created after opening failed in catalog registering

2020-09-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19227:
---
Labels: pull-request-available starter  (was: starter)

> The catalog is still created after opening failed in catalog registering
> 
>
> Key: FLINK-19227
> URL: https://issues.apache.org/jira/browse/FLINK-19227
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Jingsong Lee
>Assignee: CloseRiver
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.11.3
>
>
> > When I create the HiveCatalog and Flink is not able to connect to the 
> >HiveMetastore, the statement can not be executed, but the catalog is still 
> >created. Subsequent attempts to query the tables result in a NPE.
> In CatalogManager.registerCatalog.
>  Consider open is a relatively easy operation to fail, we should put catalog 
> into catalog manager after its open.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuxiaoshang opened a new pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering

2020-09-17 Thread GitBox


zhuxiaoshang opened a new pull request #13414:
URL: https://github.com/apache/flink/pull/13414


   ## What is the purpose of the change
   
   When I create the HiveCatalog and Flink is not able to connect to the 
HiveMetastore, the statement can not be executed, but the catalog is still 
created. Subsequent attempts to query the tables result in a NPE.
   
   In CatalogManager.registerCatalog.
   Consider open is a relatively easy operation to fail, we should put catalog 
into catalog manager after its open.
   
   
   ## Brief change log
   
   - have a double check about hive metastore client is whether created 
successfully and fail fast if not.
   - put catalog into catalog manager when the catalog is opened successfully.
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not documented)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Jiayi-Liao commented on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-17 Thread GitBox


Jiayi-Liao commented on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-694638017


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19262) Can not setParallelism for FLIP-27 source

2020-09-17 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-19262.

Resolution: Fixed

master: 49e3ff72e7fb06360c02578e30fa03cce24ca04f

> Can not setParallelism for FLIP-27 source
> -
>
> Key: FLINK-19262
> URL: https://issues.apache.org/jira/browse/FLINK-19262
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jingsong Lee
>Assignee: liufangliang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> The constructor for new Sources in {{DataStreamSource}} does not set 
> isParallel to true. So if we setParallelism for FLIP-27 source, there will be 
> a validation exception from validateParallelism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19281) LIKE cannot recognize full table path

2020-09-17 Thread CloseRiver (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198100#comment-17198100
 ] 

CloseRiver commented on FLINK-19281:


I think ,we should use `sqlTableLike.getSourceTable().names` instead of 
`sqlTableLike.getSourceTable().toString()`.
WDYT,[~libenchao].If i'm right ,i can do this.

> LIKE cannot recognize full table path
> -
>
> Key: FLINK-19281
> URL: https://issues.apache.org/jira/browse/FLINK-19281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.11.2
>Reporter: Benchao Li
>Priority: Major
>
> for example, if we have a table whose full path is 
> {{default_catalog.default_database.my_table1}}, and the following DDL will 
> fail
> {code:SQL}
> create table my_table2 
> like default_catalog.default_database.my_table1
> {code}
> it will throw
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Source table 
> '`default_catalog`.`default_database`.`default_catalog.default_database.my_table1`'
>  of the LIKE clause not found in the catalog, at line 11, column 6
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:207)
>   at java.util.Optional.orElseThrow(Optional.java:290)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:207)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:103)
>   at 
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:83)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:188)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.convertSqlNodeToOperation(ParserImpl.java:92)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlInternal(TableEnvironmentImpl.java:724)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sql(TableEnvironmentImpl.java:704)
> {code}
> We can fix it in {{SqlCreateTableConverter#lookupLikeSourceTable}}, via using 
> `SqlTableLike`'s full name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #13402: [FLINK-19262][API/DataStream] Can not setParallelism for FLIP-27 source

2020-09-17 Thread GitBox


JingsongLi merged pull request #13402:
URL: https://github.com/apache/flink/pull/13402


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19200) UNIX_TIMESTAMP function support return in millisecond

2020-09-17 Thread leslieyuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198099#comment-17198099
 ] 

leslieyuan commented on FLINK-19200:


[~jark] Hi jark

thanks for your work, in my opinion, way(2) sound good to me.

> UNIX_TIMESTAMP function support return in millisecond
> -
>
> Key: FLINK-19200
> URL: https://issues.apache.org/jira/browse/FLINK-19200
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: leslieyuan
>Priority: Major
>
> Now i use Flink1.10.0, i found that:
> time = "2020-09-11 13:14:29.153"
> UNIX_TIMESTAMP(time)  return 1599801269
> UNIX_TIMESTAMP(time, '-MM-dd HH:mm:ss.SSS')  also return 1599801269
> Yes, i see the Official website description, this function return in seconds, 
> but i think if i had given the format as above, which means that i need the  
> millisecond.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18444) KafkaITCase.testMultipleSourcesOnePartition failed with "Failed to send data to Kafka: This server does not host this topic-partition"

2020-09-17 Thread Roc Marshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198096#comment-17198096
 ] 

Roc Marshal commented on FLINK-18444:
-

Hi, [~dian.fu]
According to the logic of the test case, my personal understanding of this 
exception is that the deletion of topic occurs before the writing of Kafka. Is 
that right?

> KafkaITCase.testMultipleSourcesOnePartition failed with "Failed to send data 
> to Kafka: This server does not host this topic-partition"
> --
>
> Key: FLINK-18444
> URL: https://issues.apache.org/jira/browse/FLINK-18444
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
>
> Instance on master: 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=4092=logs=ce8f3cc3-c1ea-5281-f5eb-df9ebd24947f
> {code}
> 2020-06-28T21:37:54.8113215Z [ERROR] 
> testMultipleSourcesOnePartition(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>   Time elapsed: 5.079 s  <<< ERROR!
> 2020-06-28T21:37:54.8113885Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-06-28T21:37:54.8114418Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-06-28T21:37:54.8114905Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:677)
> 2020-06-28T21:37:54.8115397Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:81)
> 2020-06-28T21:37:54.8116254Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
> 2020-06-28T21:37:54.8116857Z  at 
> org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.generateRandomizedIntegerSequence(DataGenerators.java:120)
> 2020-06-28T21:37:54.8117715Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:933)
> 2020-06-28T21:37:54.8118327Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testMultipleSourcesOnePartition(KafkaITCase.java:107)
> 2020-06-28T21:37:54.8118805Z  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-06-28T21:37:54.8119859Z  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-06-28T21:37:54.8120861Z  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-06-28T21:37:54.8121436Z  at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2020-06-28T21:37:54.8121899Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-06-28T21:37:54.8122424Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-06-28T21:37:54.8122942Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-06-28T21:37:54.8123406Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-06-28T21:37:54.8123899Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-06-28T21:37:54.8124507Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-06-28T21:37:54.8124978Z  at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2020-06-28T21:37:54.8125332Z  at 
> java.base/java.lang.Thread.run(Thread.java:834)
> 2020-06-28T21:37:54.8125743Z Caused by: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> 2020-06-28T21:37:54.8126305Z  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> 2020-06-28T21:37:54.8126961Z  at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> 2020-06-28T21:37:54.8127766Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> 2020-06-28T21:37:54.8128570Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
> 2020-06-28T21:37:54.8129140Z  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
> 2020-06-28T21:37:54.8129686Z  at 
> 

[jira] [Commented] (FLINK-19264) Jobs with identical graph shapes cannot be run concurrently

2020-09-17 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198094#comment-17198094
 ] 

Zhu Zhu commented on FLINK-19264:
-

[~aljoscha] yes the {{StreamNode}} hash is, if not specified by users, 
generated in a deterministic way with regard to:
* node id
* chained output nodes hashes
* input nodes hashes

So identical stream graph shape will result in the same set of {{StreamNode}} 
hashes, and then {{JobVertexID}}s.

Hi [~karmagyz], I'm still a bit concerned about the size of 
{{TaskDeploymentDescriptor}} if we enlarge each {{ExecutionAttemptID}} with a 
{{JobID}}. So before making this change, could you do an experiment on whether 
there would be a significant TDD size increase? 

> Jobs with identical graph shapes cannot be run concurrently
> ---
>
> Key: FLINK-19264
> URL: https://issues.apache.org/jira/browse/FLINK-19264
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> While working on FLINK-19123 I noticed that jobs often fail on 
> {{MiniCluster}} when multiple jobs are running at the same time. 
> I created a reproducer here: 
> https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative.
>  You can run {{MiniClusterConcurrencyITCase}} to see the problem in action. 
> Sometimes the test will succeed, sometimes it will fail with
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>   at 
> 

[jira] [Commented] (FLINK-19283) Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods

2020-09-17 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198091#comment-17198091
 ] 

Yun Tang commented on FLINK-19283:
--

[~adriank], I am not familiar with the background of kafka source connector to 
mark  {{initializeState}} method as final. However, you could still work around 
this to add your additional logic by using {{DataStreamSource}}:


{code:java}
private static class KafkaSource extends StreamSource> {

public KafkaSource(FlinkKafkaConsumer sourceFunction) {
super(sourceFunction);
}

@Override
public void initializeState(StateInitializationContext context) 
throws Exception {
super.initializeState(context);
   // some additional initialization which executed before 
CheckpointedFunction#initializeState
}

@Override
public void snapshotState(StateSnapshotContext context) throws 
Exception {
super.snapshotState(context);
// set some additional state which executed before 
CheckpointedFunction#snapshotState
}
}

..

KafkaSource kafkaSource = new KafkaSource(new FlinkKafkaConsumer<>(topic, 
new LimitedLongDeserializer(), standardProps));
DataStreamSource streamSource = new DataStreamSource<>(env, 
LONG_TYPE_INFO, kafkaSource1, true, "source");

{code}


> Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods
> -
>
> Key: FLINK-19283
> URL: https://issues.apache.org/jira/browse/FLINK-19283
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Adrian Kreuziger
>Priority: Minor
>
> I'm working on a class that extends the FlinkKafkaConsumer to add some 
> additional functionality the first time the consumer runs. I'd like to be 
> able to store some additional state, but am unable to do so as the 
> initializeState() and snapshotState() are marked as final. Ideally I'd like 
> to be able to do something like
> {code:java}
> @Override 
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
> super.initializeState(context);
> // some additional initialization here
> }
> @Override 
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> super.snapshotState(context);
> // set some additional state here
> }{code}
> I'm guessing it was marked final for a reason, is there a reason this would 
> be problematic? The restoredState and unionOffsetStates properties are still 
> private which would prevent subclasses from modifying the offset state.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13388:
URL: https://github.com/apache/flink/pull/13388#issuecomment-692422938


   
   ## CI report:
   
   * 6d3f77bd08ed3dba62375807a06325e320b98fb3 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6506)
 
   * f98b2af28efffa78d78b4cb56ad6c14441c2599d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6620)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on pull request #13411: [FLINK-19228][filesystem] Avoid accessing FileSystem in client for file system connector

2020-09-17 Thread GitBox


JingsongLi commented on pull request #13411:
URL: https://github.com/apache/flink/pull/13411#issuecomment-694616610


   We should add case to test file sink without input.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13388:
URL: https://github.com/apache/flink/pull/13388#issuecomment-692422938


   
   ## CI report:
   
   * 6d3f77bd08ed3dba62375807a06325e320b98fb3 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6506)
 
   * f98b2af28efffa78d78b4cb56ad6c14441c2599d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-17 Thread GitBox


leonardBang commented on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-694613774


   'canal-json.database.include' and 'canal-json.table.include' make sense to 
me, 
   LGTM
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13332: [FLINK-19128][sql-client] Remove the runtime execution configuration in sql-client-defaults.yaml

2020-09-17 Thread GitBox


wuchong commented on pull request #13332:
URL: https://github.com/apache/flink/pull/13332#issuecomment-694613797


   @godfreyhe , I don't think so. The purpose of this task is fallback some 
important runtime configuration to `flink-conf.yaml` instead of setting them in 
`sql-client-defaults.yaml` by default. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] HuangXingBo commented on pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on pull request #13388:
URL: https://github.com/apache/flink/pull/13388#issuecomment-694612738


   @dianfu Thanks a lot for the review. I have addressed comments at the latest 
commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] HuangXingBo commented on a change in pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on a change in pull request #13388:
URL: https://github.com/apache/flink/pull/13388#discussion_r490665709



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/AggregateTest.scala
##
@@ -72,4 +73,16 @@ class AggregateTest extends TableTestBase {
 
 util.verifyPlan(resultTable)
   }
+
+  @Test

Review comment:
   Yes. Make sense. It will make Python Aggregate test more clear.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19246) TableSourceITCase.testStreamScanParallelism fails on private Azure accounts

2020-09-17 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-19246.
---
Fix Version/s: 1.12.0
   Resolution: Fixed

Fixed in master (1.12.0): b930423c5a7f1f30ce0ff837d28bde15a40d6fb3

> TableSourceITCase.testStreamScanParallelism fails on private Azure accounts
> ---
>
> Key: FLINK-19246
> URL: https://issues.apache.org/jira/browse/FLINK-19246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Jark Wu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> Example: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8381=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> or
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8379=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> or
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8369=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> (this change is already merged to master, so it is unlikely to cause the 
> error)
> {code}
> 2020-09-15T13:51:34.6773312Z 
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the CollectionInputFormat is not serializable. The object probably contains 
> or references non serializable fields.
> 2020-09-15T13:51:34.6774140Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> 2020-09-15T13:51:34.6774634Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> 2020-09-15T13:51:34.6775136Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> 2020-09-15T13:51:34.6775728Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1913)
> 2020-09-15T13:51:34.6776617Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1601)
> 2020-09-15T13:51:34.6777322Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1493)
> 2020-09-15T13:51:34.6778029Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1483)
> 2020-09-15T13:51:34.6778887Z  at 
> org.apache.flink.table.factories.utils.TestCollectionTableFactory$CollectionTableSource.getDataStream(TestCollectionTableFactory.scala:159)
> 2020-09-15T13:51:34.6779659Z  at 
> org.apache.flink.table.factories.utils.TestCollectionTableFactory$CollectionTableSource.getDataStream(TestCollectionTableFactory.scala:134)
> 2020-09-15T13:51:34.6780394Z  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:105)
> 2020-09-15T13:51:34.6781058Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateInput(DataStreamSink.scala:189)
> 2020-09-15T13:51:34.6781655Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.writeToSink(DataStreamSink.scala:84)
> 2020-09-15T13:51:34.6782266Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateToPlan(DataStreamSink.scala:59)
> 2020-09-15T13:51:34.6782951Z  at 
> org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translateToCRow(StreamPlanner.scala:274)
> 2020-09-15T13:51:34.6783640Z  at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:119)
> 2020-09-15T13:51:34.6784227Z  at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:116)
> 2020-09-15T13:51:34.6784799Z  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6785345Z  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6785828Z  at 
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 2020-09-15T13:51:34.6786285Z  at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 2020-09-15T13:51:34.6786760Z  at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2020-09-15T13:51:34.6787210Z  at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2020-09-15T13:51:34.6787681Z  at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6788168Z  at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 2020-09-15T13:51:34.6788648Z  at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:116)
> 

[GitHub] [flink] HuangXingBo commented on a change in pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on a change in pull request #13388:
URL: https://github.com/apache/flink/pull/13388#discussion_r490665510



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupAggregate.scala
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.UserDefinedFunction
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.tools.RelBuilder
+import java.util
+
+import org.apache.flink.table.api.TableException
+
+import scala.collection.JavaConversions._
+
+/**
+  * Batch physical RelNode for aggregate (Python user defined aggregate 
function).
+  */
+class BatchExecPythonGroupAggregate(
+cluster: RelOptCluster,
+relBuilder: RelBuilder,
+traitSet: RelTraitSet,
+inputRel: RelNode,
+outputRowType: RelDataType,
+inputRowType: RelDataType,
+val aggInputRowType: RelDataType,
+grouping: Array[Int],
+auxGrouping: Array[Int],
+aggCalls: Seq[AggregateCall],
+aggFunctions: Array[UserDefinedFunction])
+  extends BatchExecGroupAggregateBase(
+cluster,
+relBuilder,
+traitSet,
+inputRel,
+outputRowType,
+inputRowType,
+grouping,
+auxGrouping,
+aggCalls.zip(aggFunctions),
+isMerge = false,
+isFinal = true)
+  with BatchExecNode[RowData] {
+
+  override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
+
+  override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
+List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
+
+  override def replaceInputNode(
+  ordinalInParent: Int,
+  newInputNode: ExecNode[BatchPlanner, _]): Unit = {
+replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+

Review comment:
   Yes. Thanks for reminding





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13407: [FLINK-19246][table-planner] Fix TableSourceITCase.testStreamScanParallelism fails on Azure

2020-09-17 Thread GitBox


wuchong commented on pull request #13407:
URL: https://github.com/apache/flink/pull/13407#issuecomment-694611904


   Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong merged pull request #13407: [FLINK-19246][table-planner] Fix TableSourceITCase.testStreamScanParallelism fails on Azure

2020-09-17 Thread GitBox


wuchong merged pull request #13407:
URL: https://github.com/apache/flink/pull/13407


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] HuangXingBo commented on a change in pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on a change in pull request #13388:
URL: https://github.com/apache/flink/pull/13388#discussion_r490665353



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonAggregateRule.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.ImperativeAggregateFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.python.PythonFunction;
+import org.apache.flink.table.functions.python.PythonFunctionKind;
+import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.Seq;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalAggregate} 
to
+ * {@link BatchExecPythonGroupAggregate}.
+ */
+public class BatchExecPythonAggregateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonAggregateRule();
+
+   private BatchExecPythonAggregateRule() {
+   super(FlinkLogicalAggregate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonAggregateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalAggregate agg = call.rel(0);
+   List aggCalls = agg.getAggCallList();
+   boolean existPandasFunction = false;
+   boolean existGeneralPythonFunction = false;
+   boolean existJavaFunction = false;
+   for (AggregateCall aggCall : aggCalls) {
+   SqlAggFunction aggregation = aggCall.getAggregation();
+   if (aggregation instanceof AggSqlFunction) {
+   ImperativeAggregateFunction func =
+   ((AggSqlFunction) 
aggregation).aggregateFunction();
+   if (func instanceof PythonFunction) {
+   PythonFunction pythonFunction = 
(PythonFunction) func;
+   if 
(pythonFunction.getPythonFunctionKind() == PythonFunctionKind.PANDAS) {
+   existPandasFunction = true;
+   } else {
+   existGeneralPythonFunction = 
true;
+   }
+   } else {
+   existJavaFunction = true;
+   }
+   }
+   }
+   if (existPandasFunction) {
+   if (existGeneralPythonFunction) {
+   throw new TableException("Pandas UDAF cannot be 
computed with General Python UDAF currently");
+   }
+   if (existJavaFunction) {
+  

[GitHub] [flink] HuangXingBo commented on a change in pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on a change in pull request #13388:
URL: https://github.com/apache/flink/pull/13388#discussion_r490665067



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonAggregateRule.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.ImperativeAggregateFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.python.PythonFunction;
+import org.apache.flink.table.functions.python.PythonFunctionKind;
+import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.Seq;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalAggregate} 
to
+ * {@link BatchExecPythonGroupAggregate}.
+ */
+public class BatchExecPythonAggregateRule extends ConverterRule {
+
+   public static final RelOptRule INSTANCE = new 
BatchExecPythonAggregateRule();
+
+   private BatchExecPythonAggregateRule() {
+   super(FlinkLogicalAggregate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
+   "BatchExecPythonAggregateRule");
+   }
+
+   @Override
+   public boolean matches(RelOptRuleCall call) {
+   FlinkLogicalAggregate agg = call.rel(0);
+   List aggCalls = agg.getAggCallList();
+   boolean existPandasFunction = false;
+   boolean existGeneralPythonFunction = false;
+   boolean existJavaFunction = false;
+   for (AggregateCall aggCall : aggCalls) {
+   SqlAggFunction aggregation = aggCall.getAggregation();
+   if (aggregation instanceof AggSqlFunction) {
+   ImperativeAggregateFunction func =
+   ((AggSqlFunction) 
aggregation).aggregateFunction();
+   if (func instanceof PythonFunction) {
+   PythonFunction pythonFunction = 
(PythonFunction) func;
+   if 
(pythonFunction.getPythonFunctionKind() == PythonFunctionKind.PANDAS) {
+   existPandasFunction = true;
+   } else {
+   existGeneralPythonFunction = 
true;
+   }
+   } else {
+   existJavaFunction = true;
+   }
+   }
+   }
+   if (existPandasFunction) {
+   if (existGeneralPythonFunction) {
+   throw new TableException("Pandas UDAF cannot be 
computed with General Python UDAF currently");
+   }
+   if (existJavaFunction) {
+  

[GitHub] [flink] HuangXingBo commented on a change in pull request #13388: [FLINK-19184][python] Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-17 Thread GitBox


HuangXingBo commented on a change in pull request #13388:
URL: https://github.com/apache/flink/pull/13388#discussion_r490664786



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSortAggRule.scala
##
@@ -65,7 +67,13 @@ class BatchExecSortAggRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
 val tableConfig = 
call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
-!isOperatorDisabled(tableConfig, OperatorType.SortAgg)
+val agg: FlinkLogicalAggregate = call.rel(0)
+!isOperatorDisabled(tableConfig, OperatorType.SortAgg) &&
+  !agg.getAggCallList.exists(x => {
+val aggregation = x.getAggregation

Review comment:
   Yes. Make sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on a change in pull request #13230: [FLINK-18950][python][docs] Add documentation for Operations in Python DataStream API.

2020-09-17 Thread GitBox


dianfu commented on a change in pull request #13230:
URL: https://github.com/apache/flink/pull/13230#discussion_r490225135



##
File path: docs/dev/python/datastream-api-users-guide/operators.md
##
@@ -0,0 +1,87 @@
+---
+title: "Operators"
+nav-parent_id: python_datastream_api
+nav-pos: 20
+---
+
+
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
+sophisticated dataflow topologies.
+
+This section gives a description of the basic transformations Python 
DataStream API provides, the effective physical 
+partitioning after applying those as well as insights into Flink's operator 
chaining.
+
+* This will be replaced by the TOC
+{:toc}
+
+# DataStream Transformations
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams (e.g., mapping, 
+filtering, reducing). Please see [operators]({% link 
dev/stream/operators/index.md %}
+?code_tab=python) for an overview of the available stream transformations in 
Python DataStream API.
+
+# Functions
+Most operators require a user-defined function. The following will describe 
different ways of how they can be specified.

Review comment:
   ```suggestion
   Most transformations require a user-defined function as input to define the 
functionality of the transformation. The following describes different ways of 
defining user-defined functions.
   ```

##
File path: docs/dev/python/datastream-api-users-guide/operators.md
##
@@ -0,0 +1,87 @@
+---
+title: "Operators"
+nav-parent_id: python_datastream_api
+nav-pos: 20
+---
+
+
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
+sophisticated dataflow topologies.
+
+This section gives a description of the basic transformations Python 
DataStream API provides, the effective physical 

Review comment:
   It seems that the description doesn't apply here.

##
File path: docs/dev/python/datastream-api-users-guide/operators.md
##
@@ -0,0 +1,87 @@
+---
+title: "Operators"
+nav-parent_id: python_datastream_api
+nav-pos: 20
+---
+
+
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
+sophisticated dataflow topologies.
+
+This section gives a description of the basic transformations Python 
DataStream API provides, the effective physical 
+partitioning after applying those as well as insights into Flink's operator 
chaining.
+
+* This will be replaced by the TOC
+{:toc}
+
+# DataStream Transformations
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams (e.g., mapping, 
+filtering, reducing). Please see [operators]({% link 
dev/stream/operators/index.md %}
+?code_tab=python) for an overview of the available stream transformations in 
Python DataStream API.
+
+# Functions
+Most operators require a user-defined function. The following will describe 
different ways of how they can be specified.
+
+## Implementing Function Interfaces
+Function interfaces for different operations are provided in Python DataStream 
API. Users can implement a Function 
+interface and pass it to the corresponding operation. Take MapFunction for 
instance:
+
+{% highlight python %}
+# Implementing a MapFunction that returns plus one value of input value.
+class MyMapFunction(MapFunction):
+
+def map(value):
+return value + 1
+
+data_stream = env.from_collection([1, 2, 3, 4, 5],type_info=Types.INT())
+mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
+{% endhighlight %}
+
+Note In Python DataStream API, users are 
able to defined the output type information of the operation. If not 
+defined, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will 
be in a form of byte array generated by 
+pickle seriallizer. For more details about the `Pickle Serialization`, please 
refer to [DataTypes]({% link dev/python/datastream-api-users-guide/data_types.md
+ %}#pickle-serialization).
+
+## Lambda Functions
+As shown in the following example, all operations can also accept a lambda 
function to describe the operation:

Review comment:
   ```suggestion
   As shown in the following example, all the transformations can also accept a 
lambda function to define the functionality of the transformation:
   ```

##
File path: docs/dev/python/datastream-api-users-guide/operators.md
##
@@ -0,0 +1,87 @@
+---
+title: "Operators"
+nav-parent_id: python_datastream_api
+nav-pos: 20
+---
+
+
+
+Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
+sophisticated dataflow topologies.
+
+This section gives a description of the basic transformations Python 
DataStream API provides, the effective physical 
+partitioning after applying those as well as insights into Flink's operator 

[jira] [Commented] (FLINK-12751) Create file based HA support

2020-09-17 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198068#comment-17198068
 ] 

Yang Wang commented on FLINK-12751:
---

Yes, you are right. I am aware of this solution. And I think we could have both 
for deploying Flink on Kubernetes.

 

BTW, this tickets seems to be a duplicate of FLINK-17598.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] SteNicholas removed a comment on pull request #13298: [FLINK-19038][table] It doesn't support to call Table.limit() continuously

2020-09-17 Thread GitBox


SteNicholas removed a comment on pull request #13298:
URL: https://github.com/apache/flink/pull/13298#issuecomment-694090738


   > Thanks @SteNicholas. I will take a look shortly.
   
   @twalthr Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19246) TableSourceITCase.testStreamScanParallelism fails on private Azure accounts

2020-09-17 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198057#comment-17198057
 ] 

Dian Fu commented on FLINK-19246:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6615=logs=9fca669f-5c5f-59c7-4118-e31c641064f0=508fb1b1-538d-50d3-f3b5-2c91da394c53

> TableSourceITCase.testStreamScanParallelism fails on private Azure accounts
> ---
>
> Key: FLINK-19246
> URL: https://issues.apache.org/jira/browse/FLINK-19246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Assignee: Jark Wu
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> Example: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8381=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> or
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8379=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> or
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8369=logs=69332ead-8935-5abf-5b3d-e4280fb1ff4c=58eb9526-6bcb-5835-ae76-f5bd5f6df6ac
> (this change is already merged to master, so it is unlikely to cause the 
> error)
> {code}
> 2020-09-15T13:51:34.6773312Z 
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the CollectionInputFormat is not serializable. The object probably contains 
> or references non serializable fields.
> 2020-09-15T13:51:34.6774140Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> 2020-09-15T13:51:34.6774634Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> 2020-09-15T13:51:34.6775136Z  at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> 2020-09-15T13:51:34.6775728Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1913)
> 2020-09-15T13:51:34.6776617Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1601)
> 2020-09-15T13:51:34.6777322Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1493)
> 2020-09-15T13:51:34.6778029Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1483)
> 2020-09-15T13:51:34.6778887Z  at 
> org.apache.flink.table.factories.utils.TestCollectionTableFactory$CollectionTableSource.getDataStream(TestCollectionTableFactory.scala:159)
> 2020-09-15T13:51:34.6779659Z  at 
> org.apache.flink.table.factories.utils.TestCollectionTableFactory$CollectionTableSource.getDataStream(TestCollectionTableFactory.scala:134)
> 2020-09-15T13:51:34.6780394Z  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:105)
> 2020-09-15T13:51:34.6781058Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateInput(DataStreamSink.scala:189)
> 2020-09-15T13:51:34.6781655Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.writeToSink(DataStreamSink.scala:84)
> 2020-09-15T13:51:34.6782266Z  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateToPlan(DataStreamSink.scala:59)
> 2020-09-15T13:51:34.6782951Z  at 
> org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translateToCRow(StreamPlanner.scala:274)
> 2020-09-15T13:51:34.6783640Z  at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:119)
> 2020-09-15T13:51:34.6784227Z  at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:116)
> 2020-09-15T13:51:34.6784799Z  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6785345Z  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6785828Z  at 
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 2020-09-15T13:51:34.6786285Z  at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 2020-09-15T13:51:34.6786760Z  at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2020-09-15T13:51:34.6787210Z  at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2020-09-15T13:51:34.6787681Z  at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 2020-09-15T13:51:34.6788168Z  at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 2020-09-15T13:51:34.6788648Z  at 
> 

[jira] [Commented] (FLINK-17274) Maven: Premature end of Content-Length delimited message body

2020-09-17 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198056#comment-17198056
 ] 

Dian Fu commented on FLINK-17274:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6616=logs=ff2e2ea5-07e3-5521-7b04-a4fc3ad765e9=6945d9e3-ebef-5993-0c44-838d8ad079c0

> Maven: Premature end of Content-Length delimited message body
> -
>
> Key: FLINK-17274
> URL: https://issues.apache.org/jira/browse/FLINK-17274
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> CI: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7786=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb
> {code}
> [ERROR] Failed to execute goal on project 
> flink-connector-elasticsearch7_2.11: Could not resolve dependencies for 
> project 
> org.apache.flink:flink-connector-elasticsearch7_2.11:jar:1.11-SNAPSHOT: Could 
> not transfer artifact org.apache.lucene:lucene-sandbox:jar:8.3.0 from/to 
> alicloud-mvn-mirror 
> (http://mavenmirror.alicloud.dak8s.net:/repository/maven-central/): GET 
> request of: org/apache/lucene/lucene-sandbox/8.3.0/lucene-sandbox-8.3.0.jar 
> from alicloud-mvn-mirror failed: Premature end of Content-Length delimited 
> message body (expected: 289920; received: 239832 -> [Help 1]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19284) Add documentation about how to use Python UDF in the Java Table API

2020-09-17 Thread Dian Fu (Jira)
Dian Fu created FLINK-19284:
---

 Summary: Add documentation about how to use Python UDF in the Java 
Table API
 Key: FLINK-19284
 URL: https://issues.apache.org/jira/browse/FLINK-19284
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Documentation
Reporter: Dian Fu
Assignee: Wei Zhong
 Fix For: 1.12.0, 1.11.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18918) Add a "Connectors" document under the "Python API" -> "User Guide" -> "Table API" section

2020-09-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-18918.
---
Fix Version/s: 1.11.3
   1.12.0
   Resolution: Fixed

master(1.12): 1cb0a973639cf9fcf35b8d162f376e4549e972d2
release-1.11: 5f4d8ce2571074ae490f202348f227f39096b6a8

> Add a "Connectors" document under  the "Python API" -> "User Guide" -> "Table 
> API" section
> --
>
> Key: FLINK-18918
> URL: https://issues.apache.org/jira/browse/FLINK-18918
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Wei Zhong
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18918) Add a "Connectors" document under the "Python API" -> "User Guide" -> "Table API" section

2020-09-17 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-18918:

Component/s: API / Python

> Add a "Connectors" document under  the "Python API" -> "User Guide" -> "Table 
> API" section
> --
>
> Key: FLINK-18918
> URL: https://issues.apache.org/jira/browse/FLINK-18918
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Documentation
>Reporter: Wei Zhong
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #13193: [FLINK-18918][python][docs] Add dedicated connector documentation for Python Table API

2020-09-17 Thread GitBox


dianfu closed pull request #13193:
URL: https://github.com/apache/flink/pull/13193


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on pull request #13193: [FLINK-18918][python][docs] Add dedicated connector documentation for Python Table API

2020-09-17 Thread GitBox


dianfu commented on pull request #13193:
URL: https://github.com/apache/flink/pull/13193#issuecomment-694591362


   @sjwiesman @morsapaes Thanks a lot for the review and thanks @hequn8128 for 
the PR. Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19283) Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods

2020-09-17 Thread Adrian Kreuziger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Kreuziger updated FLINK-19283:
-
Description: 
I'm working on a class that extends the FlinkKafkaConsumer to add some 
additional functionality the first time the consumer runs. I'd like to be able 
to store some additional state, but am unable to do so as the initializeState() 
and snapshotState() are marked as final. Ideally I'd like to be able to do 
something like
{code:java}
@Override   
public final void initializeState(FunctionInitializationContext context) throws 
Exception {
super.initializeState(context);
// some additional initialization here
}

@Override   
public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
super.snapshotState(context);
// set some additional state here
}{code}
I'm guessing it was marked final for a reason, is there a reason this would be 
problematic? The restoredState and unionOffsetStates properties are still 
private which would prevent subclasses from modifying the offset state.

 

  was:
I'm working on a class that extends the FlinkKafkaConsumer to add some 
additional functionality the first time the consumer runs. I'd like to be able 
to store some additional state, but am unable to do so as the initializeState() 
and snapshotState() are marked as final. Ideally I'd like to be able to do 
something like
{code:java}
@Override   public final void initializeState(FunctionInitializationContext 
context) throws Exception {
super.initializeState(context);
// some additional initialization here

}

@Override   public final void snapshotState(FunctionSnapshotContext 
context) throws Exception {
super.snapshotState(context);
// set some additional state here

}{code}
I'm guessing it was marked final for a reason, is there a reason this would be 
problematic? The restoredState and unionOffsetStates properties are still 
private which would prevent subclasses from modifying the offset state.

 


> Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods
> -
>
> Key: FLINK-19283
> URL: https://issues.apache.org/jira/browse/FLINK-19283
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Adrian Kreuziger
>Priority: Minor
>
> I'm working on a class that extends the FlinkKafkaConsumer to add some 
> additional functionality the first time the consumer runs. I'd like to be 
> able to store some additional state, but am unable to do so as the 
> initializeState() and snapshotState() are marked as final. Ideally I'd like 
> to be able to do something like
> {code:java}
> @Override 
> public final void initializeState(FunctionInitializationContext context) 
> throws Exception {
> super.initializeState(context);
> // some additional initialization here
> }
> @Override 
> public final void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
> super.snapshotState(context);
> // set some additional state here
> }{code}
> I'm guessing it was marked final for a reason, is there a reason this would 
> be problematic? The restoredState and unionOffsetStates properties are still 
> private which would prevent subclasses from modifying the offset state.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19283) Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods

2020-09-17 Thread Adrian Kreuziger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Kreuziger updated FLINK-19283:
-
Description: 
I'm working on a class that extends the FlinkKafkaConsumer to add some 
additional functionality the first time the consumer runs. I'd like to be able 
to store some additional state, but am unable to do so as the initializeState() 
and snapshotState() are marked as final. Ideally I'd like to be able to do 
something like
{code:java}
@Override   
public void initializeState(FunctionInitializationContext context) throws 
Exception {
super.initializeState(context);
// some additional initialization here
}

@Override   
public void snapshotState(FunctionSnapshotContext context) throws Exception {
super.snapshotState(context);
// set some additional state here
}{code}
I'm guessing it was marked final for a reason, is there a reason this would be 
problematic? The restoredState and unionOffsetStates properties are still 
private which would prevent subclasses from modifying the offset state.

 

  was:
I'm working on a class that extends the FlinkKafkaConsumer to add some 
additional functionality the first time the consumer runs. I'd like to be able 
to store some additional state, but am unable to do so as the initializeState() 
and snapshotState() are marked as final. Ideally I'd like to be able to do 
something like
{code:java}
@Override   
public final void initializeState(FunctionInitializationContext context) throws 
Exception {
super.initializeState(context);
// some additional initialization here
}

@Override   
public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
super.snapshotState(context);
// set some additional state here
}{code}
I'm guessing it was marked final for a reason, is there a reason this would be 
problematic? The restoredState and unionOffsetStates properties are still 
private which would prevent subclasses from modifying the offset state.

 


> Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods
> -
>
> Key: FLINK-19283
> URL: https://issues.apache.org/jira/browse/FLINK-19283
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Adrian Kreuziger
>Priority: Minor
>
> I'm working on a class that extends the FlinkKafkaConsumer to add some 
> additional functionality the first time the consumer runs. I'd like to be 
> able to store some additional state, but am unable to do so as the 
> initializeState() and snapshotState() are marked as final. Ideally I'd like 
> to be able to do something like
> {code:java}
> @Override 
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
> super.initializeState(context);
> // some additional initialization here
> }
> @Override 
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> super.snapshotState(context);
> // set some additional state here
> }{code}
> I'm guessing it was marked final for a reason, is there a reason this would 
> be problematic? The restoredState and unionOffsetStates properties are still 
> private which would prevent subclasses from modifying the offset state.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19283) Allow subclasses to override/extend FlinkKafkaConsumerBase checkpoint methods

2020-09-17 Thread Adrian Kreuziger (Jira)
Adrian Kreuziger created FLINK-19283:


 Summary: Allow subclasses to override/extend 
FlinkKafkaConsumerBase checkpoint methods
 Key: FLINK-19283
 URL: https://issues.apache.org/jira/browse/FLINK-19283
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.11.1, 1.11.0
Reporter: Adrian Kreuziger


I'm working on a class that extends the FlinkKafkaConsumer to add some 
additional functionality the first time the consumer runs. I'd like to be able 
to store some additional state, but am unable to do so as the initializeState() 
and snapshotState() are marked as final. Ideally I'd like to be able to do 
something like
{code:java}
@Override   public final void initializeState(FunctionInitializationContext 
context) throws Exception {
super.initializeState(context);
// some additional initialization here

}

@Override   public final void snapshotState(FunctionSnapshotContext 
context) throws Exception {
super.snapshotState(context);
// set some additional state here

}{code}
I'm guessing it was marked final for a reason, is there a reason this would be 
problematic? The restoredState and unionOffsetStates properties are still 
private which would prevent subclasses from modifying the offset state.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2020-09-17 Thread Teng Fei Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197929#comment-17197929
 ] 

Teng Fei Liao commented on FLINK-12751:
---

If you replace the deployment with a stateful set for the job manager, 
kubernetes will guarantee you exactly 1 job manager will be running. An 
additional benefit you get here is that the stateful maintains the network 
identity for the job manager. We're actually running this variant in production.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13413: [FLINK-19188][examples-table] Add a new streaming SQL example

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13413:
URL: https://github.com/apache/flink/pull/13413#issuecomment-694306028


   
   ## CI report:
   
   * 4ccc07b606b8bc121576b45aa6684c0c71a911f5 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6614)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13392: [FLINK-18713][table-planner-blink] Change duration configOption to duration type

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13392:
URL: https://github.com/apache/flink/pull/13392#issuecomment-692540614


   
   ## CI report:
   
   * b4edfc577e0d1d20203de931dbbab1ac4ab2d621 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6613)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-692330852


   
   ## CI report:
   
   * 98b77dffa6ea7b1da3940d9c5cceb5e8d7a73e82 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6612)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-694253946


   
   ## CI report:
   
   * 85d80667243bb98025bc5aa52565f4e6b57d468a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6610)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13316:
URL: https://github.com/apache/flink/pull/13316#issuecomment-686398901


   
   ## CI report:
   
   * e37cb771c66ed8cab48e0b7abd53132fb15dfca3 UNKNOWN
   * 0de00fc4418446c51b4df3a2dc4814e9fb55f273 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6608)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13411: [FLINK-19228][filesystem] Avoid accessing FileSystem in client for file system connector

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13411:
URL: https://github.com/apache/flink/pull/13411#issuecomment-694223221


   
   ## CI report:
   
   * 9180f9d7f240a8e01db886e2d4e5351da18b6664 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6609)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13189: [FLINK-18661][Kinesis] Stream consumer Registration/Deregistration

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13189:
URL: https://github.com/apache/flink/pull/13189#issuecomment-675350675


   
   ## CI report:
   
   * 05be61208d813aa603b5acff524c79d663579ec1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6606)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13275: [FLINK-19064][hbase] HBaseRowDataInputFormat is leaking resources

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13275:
URL: https://github.com/apache/flink/pull/13275#issuecomment-682391096


   
   ## CI report:
   
   * eed5b15d8d10f1c1228e39650bb844385d7a2283 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6603)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19016) Checksum mismatch when restore from RocksDB

2020-09-17 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197782#comment-17197782
 ] 

Jiayi Liao commented on FLINK-19016:


[~stevenz3wu] Yes. This happened to me on a machine with disk and memory 
problem on operating system layer. [~sewen] This only happened once, I can't 
reproduce this.

> Checksum mismatch when restore from RocksDB
> ---
>
> Key: FLINK-19016
> URL: https://issues.apache.org/jira/browse/FLINK-19016
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.1
>Reporter: Jiayi Liao
>Priority: Major
>
> The error stack is shown below:
> {code:java}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedMapBundleOperator_44cfc1ca74b40bb44eed1f38f72b3ea9_(71/300) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:580)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.io.IOException: Error while opening RocksDB instance.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
> at 
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:277)
> ... 12 more
> Caused by: org.rocksdb.RocksDBException: checksum mismatch
> at org.rocksdb.RocksDB.open(Native Method)
> at org.rocksdb.RocksDB.open(RocksDB.java:286)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
> ... 18 more
> {code}
> The machine goes down because of hardware problem, then the job cannot 
> restart successfully anymore. After digging a little bit, I found that 
> RocksDB in Flink uses sync instead of fsync to synchronized the data with the 
> disk. With sync operation, the RocksDB cannot guarantee that the current 
> in-progress file can be persisted on disk in takeDBNativeCheckpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13409: [FLINK-17910][e2e] Fix debug log output to investigate rare test failure

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13409:
URL: https://github.com/apache/flink/pull/13409#issuecomment-694181557


   
   ## CI report:
   
   * 9577603512bb8171c45678e110281057aaedede3 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6605)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19016) Checksum mismatch when restore from RocksDB

2020-09-17 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197779#comment-17197779
 ] 

Steven Zhen Wu commented on FLINK-19016:


We have seen a similar problem when the TM disk is full. Checkpoint is 
completed and some corrupted/incomplete RocksDB file got uploaded. maybe a bug 
somewhere that didn't bubble the failure up.

> Checksum mismatch when restore from RocksDB
> ---
>
> Key: FLINK-19016
> URL: https://issues.apache.org/jira/browse/FLINK-19016
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.1
>Reporter: Jiayi Liao
>Priority: Major
>
> The error stack is shown below:
> {code:java}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedMapBundleOperator_44cfc1ca74b40bb44eed1f38f72b3ea9_(71/300) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:580)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.io.IOException: Error while opening RocksDB instance.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
> at 
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:277)
> ... 12 more
> Caused by: org.rocksdb.RocksDBException: checksum mismatch
> at org.rocksdb.RocksDB.open(Native Method)
> at org.rocksdb.RocksDB.open(RocksDB.java:286)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
> ... 18 more
> {code}
> The machine goes down because of hardware problem, then the job cannot 
> restart successfully anymore. After digging a little bit, I found that 
> RocksDB in Flink uses sync instead of fsync to synchronized the data with the 
> disk. With sync operation, the RocksDB cannot guarantee that the current 
> in-progress file can be persisted on disk in takeDBNativeCheckpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13413: [FLINK-19188][examples-table] Add a new streaming SQL example

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13413:
URL: https://github.com/apache/flink/pull/13413#issuecomment-694306028


   
   ## CI report:
   
   * 4ccc07b606b8bc121576b45aa6684c0c71a911f5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6614)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13413: [FLINK-19188][examples-table] Add a new streaming SQL example

2020-09-17 Thread GitBox


flinkbot commented on pull request #13413:
URL: https://github.com/apache/flink/pull/13413#issuecomment-694306028


   
   ## CI report:
   
   * 4ccc07b606b8bc121576b45aa6684c0c71a911f5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13392: [FLINK-18713][table-planner-blink] Change duration configOption to duration type

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13392:
URL: https://github.com/apache/flink/pull/13392#issuecomment-692540614


   
   ## CI report:
   
   * f3af04d02472f93c5d49e0262ce7957cdd910707 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6565)
 
   * b4edfc577e0d1d20203de931dbbab1ac4ab2d621 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6613)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13401: [FLINK-19161][file connector] Add first version of the FLIP-27 File Source

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13401:
URL: https://github.com/apache/flink/pull/13401#issuecomment-693392192


   
   ## CI report:
   
   * cf35e054ad6c31b004bf07af2c3f61e714d554f2 UNKNOWN
   * 2120b4168f16bfea6bd183b8f068381042f16a15 UNKNOWN
   * c8bdc8464bc6b8e289b966db63739f8edb8cd169 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6602)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13410: [FLINK-19247] Update Chinese documentation after removal of Kafka 0.10 and 0.11

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13410:
URL: https://github.com/apache/flink/pull/13410#issuecomment-694211098


   
   ## CI report:
   
   * 69930ae1221864a5ccc986ac23c6010b76043759 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6607)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13392: [FLINK-18713][table-planner-blink] Change duration configOption to duration type

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13392:
URL: https://github.com/apache/flink/pull/13392#issuecomment-692540614


   
   ## CI report:
   
   * f3af04d02472f93c5d49e0262ce7957cdd910707 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6565)
 
   * b4edfc577e0d1d20203de931dbbab1ac4ab2d621 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-692330852


   
   ## CI report:
   
   * b08947bc6479afa39471b89c079fdc7879c4977e Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6563)
 
   * 98b77dffa6ea7b1da3940d9c5cceb5e8d7a73e82 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6612)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19188) Add a new streaming SQL examples

2020-09-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19188:
---
Labels: pull-request-available  (was: )

> Add a new streaming SQL examples
> 
>
> Key: FLINK-19188
> URL: https://issues.apache.org/jira/browse/FLINK-19188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a new streaming SQL example. It should work entirely with SQL. 
> It should show:
> How to get started with a connector (CSV or DataGen?) and event-time.
> How the DDL works.
> How to work with Table Environment and SQL.
> How a complex SQL query looks like. Windows, joins, maybe Top N.
> How to use pure streaming operators vs. changelog producing operators



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13413: [FLINK-19188][examples-table] Add a new streaming SQL example

2020-09-17 Thread GitBox


flinkbot commented on pull request #13413:
URL: https://github.com/apache/flink/pull/13413#issuecomment-694287475


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 4ccc07b606b8bc121576b45aa6684c0c71a911f5 (Thu Sep 17 
14:49:48 UTC 2020)
   
   **Warnings:**
* **1 pom.xml files were touched**: Check for build and licensing issues.
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **Invalid pull request title: No valid Jira ID provided**
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] twalthr opened a new pull request #13413: Flink 19188

2020-09-17 Thread GitBox


twalthr opened a new pull request #13413:
URL: https://github.com/apache/flink/pull/13413


   ## What is the purpose of the change
   
   Example for aggregating and ranking data using Flink SQL on updating (but 
bounded) streams.
   
   This PR depends on #13291.
   
   ## Brief change log
   
   The example shows how to declare a table using SQL DDL for reading 
insert-only data and handling
updating data. It should give a first impression about Flink SQL as a 
changelog processor. The example
uses some streaming operations that produce a stream of updates. See the 
other examples for pure CDC
processing and more complex operations.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: 
`UpdatingTopCityExample`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] alpinegizmo commented on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


alpinegizmo commented on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-694281034


   @sjwiesman Thanks for the cleanups! 
   
   From the perspective of "Can a knowledgeable Flink user figure out how to 
work with this?" I think the answer is now yes, so +1 on that front.
   
   The Chinese docs need to be updated to match what's in the English at this 
point. 
   
   As for the implementation, it's mostly boilerplate and looks okay to me. But 
you might want to get a second opinion.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangxlong commented on pull request #13392: [FLINK-18713][table-planner-blink] Change duration configOption to duration type

2020-09-17 Thread GitBox


wangxlong commented on pull request #13392:
URL: https://github.com/apache/flink/pull/13392#issuecomment-694279898


   Thanks @wuchong, updated. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-692330852


   
   ## CI report:
   
   * b08947bc6479afa39471b89c079fdc7879c4977e Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6563)
 
   * 98b77dffa6ea7b1da3940d9c5cceb5e8d7a73e82 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Tartarus0zm commented on a change in pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerComponen

2020-09-17 Thread GitBox


Tartarus0zm commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r490295372



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##
@@ -74,22 +79,46 @@
@Nonnull ResourceManager resourceManager,
@Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-   @Nonnull WebMonitorEndpoint webMonitorEndpoint) {
+   @Nonnull WebMonitorEndpoint webMonitorEndpoint,
+   @Nonnull FatalErrorHandler fatalErrorHandler,
+   @Nonnull CompletableFuture 
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+   this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
 
registerShutDownFuture();
+   failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
 
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
}
 
+   private void 
failOnPrematureTermination(CompletableFuture 
dispatcherGatewayCompletableFuture) {
+   dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+   if (dispatcher != null && dispatcher instanceof 
Dispatcher) {

Review comment:
   We first execute `failOnPrematureTermination();` and then execute 
`registerShutDownFuture();` 
   How about this?  @zentol 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman edited a comment on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


sjwiesman edited a comment on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-694269698


   @alpinegizmo 
   
   > trigger state isn't supported -- so any CountTrigger or custom Trigger 
with state can't be read
   
   Yes, I will update the docs
   
   > window state cannot be written, so I can't use this to migrate between 
state backends
   
   I have this ready to go. I just wanted go get this one merged in first. 
   
   Pushed an update



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


sjwiesman commented on pull request #13384:
URL: https://github.com/apache/flink/pull/13384#issuecomment-694269698


   @alpinegizmo 
   
   > trigger state isn't supported -- so any CountTrigger or custom Trigger 
with state can't be read
   
   Yes, I will update the docs
   
   > window state cannot be written, so I can't use this to migrate between 
state backends
   
   I have this ready to go. I just wanted go get this one merged in first. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Tartarus0zm commented on a change in pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerComponen

2020-09-17 Thread GitBox


Tartarus0zm commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r490285294



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##
@@ -74,22 +79,46 @@
@Nonnull ResourceManager resourceManager,
@Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-   @Nonnull WebMonitorEndpoint webMonitorEndpoint) {
+   @Nonnull WebMonitorEndpoint webMonitorEndpoint,
+   @Nonnull FatalErrorHandler fatalErrorHandler,
+   @Nonnull CompletableFuture 
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+   this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
 
registerShutDownFuture();
+   failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
 
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
}
 
+   private void 
failOnPrematureTermination(CompletableFuture 
dispatcherGatewayCompletableFuture) {
+   dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+   if (dispatcher != null && dispatcher instanceof 
Dispatcher) {

Review comment:
   if we use the shutdown future of the DispatcherRunner like this
   ```
   private void failOnPrematureTermination() {
CompletableFuture.anyOf(dispatcherRunner.getShutDownFuture(), 
resourceManager.getTerminationFuture())
.whenComplete((ignored, t) -> {
if (t == null) {
LOG.error("Dispatcher/ResourceManager shut down 
because something unexpected happen!");
}
if (isRunning.get()) {
LOG.warn("DispatcherResourceManagerComponent 
shut down because the Dispatcher/ResourceManager unexpected terminated.");
fatalErrorHandler.onFatalError(t);
}
});
   }
   ```
   in per-job mode, `ClusterEntrypoint.shutDownAsync()` may not have a chance 
to execute, because `ClusterEntrypoint.onFatalError` will call 
`System.exit(RUNTIME_FAILURE_RETURN_CODE);`
   Do you have any good suggestions for revision?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #13384: [FLINK-19224][state-processor-api] Support reading window operator state

2020-09-17 Thread GitBox


sjwiesman commented on a change in pull request #13384:
URL: https://github.com/apache/flink/pull/13384#discussion_r490285033



##
File path: docs/dev/libs/state_processor_api.md
##
@@ -345,6 +345,171 @@ Along with reading registered state values, each key has 
access to a `Context` w
 
 {% panel **Note:** When using a `KeyedStateReaderFunction`, all state 
descriptors must be registered eagerly inside of open. Any attempt to call a 
`RuntimeContext#get*State` will result in a `RuntimeException`. %}
 
+### Window State
+
+The state processor api supports reading state from a [window operator]({{ 
site.baseurl }}/dev/stream/operators/windows.html).
+When reading a window state, users specify the operator id, window assigner, 
and aggregation type.
+
+Additionally, a `WindowReaderFunction` can be specified to enrich each read 
with additional information similiar to 
+a `WindowFunction` or `ProcessWindowFunction`.
+
+Suppose a DataStream application that counts the number of clicks per user per 
minute.
+
+
+
+{% highlight java %}
+
+class Click {
+public String userId;
+
+public LocalDateTime time;
+}
+
+class ClickCounter implements AggregateFunction {
+
+   @Override
+   public Integer createAccumulator() {
+   return 0;
+   }
+
+   @Override
+   public Integer add(Click value, Integer accumulator) {
+   return 1 + accumulator;
+   }
+
+   @Override
+   public Integer getResult(Integer accumulator) {
+   return accumulator;
+   }
+
+   @Override
+   public Integer merge(Integer a, Integer b) {
+   return a + b;
+   }
+}
+
+DataStream clicks = . . . 
+
+clicks
+.keyBy(click -> click.userId)
+.window(TumblingEventTimeWindows.of(Time.minutes(1)))
+.aggregate(new ClickCounter())
+.uid("click-window")
+.addSink(new Sink());
+
+{% endhighlight %}
+
+
+{% highlight scala %}
+
+import java.lang.{Integer => JInteger}
+
+case class Click(userId: String, time: LocalDateTime)
+
+class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
+
+   
+   override def createAccumulator(): JInteger = 0
+
+override def add(value: Click, accumulator: JInteger): JInteger = 1 + 
accumulator
+
+override def getResult(accumulator: JInteger): JInteger = accumulator
+
+override def merge(a: JInteger, b: JInteger): JInteger = a + b
+}
+
+DataStream[Click] clicks = . . . 
+
+clicks
+.keyBy(click => click.userId)
+.window(TumblingEventTimeWindows.of(Time.minutes(1)))
+.aggregate(new ClickCounter())
+.uid("click-window")
+.addSink(new Sink())
+
+{% endhighlight %}
+
+
+
+This state can be read using the below code.
+
+
+
+{% highlight java %}
+
+class ClickState {
+
+public String userId;
+
+public int count;
+
+public TimeWindow window;
+
+public Set triggerTimers;
+}
+
+class ClickReader extends WindowReaderFunction { 
+
+   @Override
+   public void readWindow(String key, Context context, 
Iterable elements, Collector out) {
+   ClickState state = new ClickState();
+   state.userId = key;
+   state.count = elements.iterator().next();
+   state.window = context.window();
+   state.triggerTimers = context.registeredEventTimeTimers();
+   
+   out.collect(state);
+   }
+}
+
+ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ExistingSavepoint savepoint = Savepoint.load(batchEnv, 
"hdfs://checkpoint-dir", new MemoryStateBackend());
+
+savepoint
+.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))

Review comment:
   That's a typo but no it does not matter. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13412:
URL: https://github.com/apache/flink/pull/13412#issuecomment-694253946


   
   ## CI report:
   
   * 85d80667243bb98025bc5aa52565f4e6b57d468a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6610)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13294: [FLINK-19002][canal][json] Support to only read changelogs of specific database and table for canal-json format

2020-09-17 Thread GitBox


flinkbot edited a comment on pull request #13294:
URL: https://github.com/apache/flink/pull/13294#issuecomment-684586137


   
   ## CI report:
   
   * 051a6077bd7cda3eb9d73ca795285135102fc8de Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6598)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Tartarus0zm commented on a change in pull request #13319: [FLINK-19022][runtime]Register the TerminationFuture of ResourceManager and Dispatcher with DispatcherResourceManagerComponen

2020-09-17 Thread GitBox


Tartarus0zm commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r490278890



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##
@@ -74,22 +79,46 @@
@Nonnull ResourceManager resourceManager,
@Nonnull LeaderRetrievalService 
dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService 
resourceManagerRetrievalService,
-   @Nonnull WebMonitorEndpoint webMonitorEndpoint) {
+   @Nonnull WebMonitorEndpoint webMonitorEndpoint,
+   @Nonnull FatalErrorHandler fatalErrorHandler,
+   @Nonnull CompletableFuture 
dispatcherGatewayCompletableFuture) {
this.dispatcherRunner = dispatcherRunner;
this.resourceManager = resourceManager;
this.dispatcherLeaderRetrievalService = 
dispatcherLeaderRetrievalService;
this.resourceManagerRetrievalService = 
resourceManagerRetrievalService;
this.webMonitorEndpoint = webMonitorEndpoint;
+   this.fatalErrorHandler = fatalErrorHandler;
this.terminationFuture = new CompletableFuture<>();
this.shutDownFuture = new CompletableFuture<>();
 
registerShutDownFuture();
+   failOnPrematureTermination(dispatcherGatewayCompletableFuture);
}
 
private void registerShutDownFuture() {
FutureUtils.forward(dispatcherRunner.getShutDownFuture(), 
shutDownFuture);
}
 
+   private void 
failOnPrematureTermination(CompletableFuture 
dispatcherGatewayCompletableFuture) {
+   dispatcherGatewayCompletableFuture.whenComplete((dispatcher, 
throwable) -> {
+   if (dispatcher != null && dispatcher instanceof 
Dispatcher) {
+   CompletableFuture.anyOf(((Dispatcher) 
dispatcher).getTerminationFuture(), resourceManager.getTerminationFuture())
+   .whenComplete((ignored, t) -> {

Review comment:
   sorry, my understanding is wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   >