[jira] [Updated] (FLINK-12424) Supports dag (multiple-sinks query) optimization

2019-05-06 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12424:
---
Description: 
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. 
The basic idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 
{code:scala}
val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
null) t2 where a1 = b2")
tableEnv.registerTable("TempTable", table)

val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70")
tableEnv.writeToSink(table1, Sink1)

val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70")
tableEnv.writeToSink(table2, Sink2)
{code}


 !image-2019-05-07-13-33-02-793.png! 

the above plan will be decomposed into 3 blocks, block1 is the input of block2 
and block3. block2 and block3 will be optimized after block1 has been optimized.

  was:
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. 
The basic idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 
{code:scala}
val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
null) t2 where a1 = b2")
tableEnv.registerTable("TempTable", table)

val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70")
tableEnv.writeToSink(table1, Sink1)

val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70")
tableEnv.writeToSink(table2, Sink2)
{code}


 !image-2019-05-07-13-33-02-793.png! 

the above plan will be decomposed into 3 blocks, and block1 is the input of 
block2 and block3. block2 and block3 will be optimized after block1 has been 
optimized.


> Supports dag (multiple-sinks query) optimization
> 
>
> Key: FLINK-12424
> URL: https://issues.apache.org/jira/browse/FLINK-12424
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Attachments: image-2019-05-07-13-33-02-793.png
>
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
> there are more than one sink in a query, each sink-tree will be optimized 
> independent and the result execution plans are also completely independent. 
> Actually, there is a high probability of duplicate computing for a 
> multiple-sinks query. This issue aims to resolve the above problem. 
> The basic idea of the solution is as following: 
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
> just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added 
> in {{TableEnvironment}}, this method will trigger whole plan optimization and 
> execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common 
> sub-plan
> 2. optimize each block from leaf block to root block, each block only needs 
> to be optimized once
> e.g. 
> {code:scala}
> val table = tableEnv.sqlQuery("select * from 

[jira] [Updated] (FLINK-12424) Supports dag (multiple-sinks query) optimization

2019-05-06 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12424:
---
Description: 
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. 
The basic idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 
{code:scala}
val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
null) t2 where a1 = b2")
tableEnv.registerTable("TempTable", table)

val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70")
tableEnv.writeToSink(table1, Sink1)

val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70")
tableEnv.writeToSink(table2, Sink2)
{code}


 !image-2019-05-07-13-33-02-793.png! 

the above plan will be decomposed into 3 blocks, and block1 is the input of 
block2 and block3. block2 and block3 will be optimized after block1 has been 
optimized.

  was:
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. The basic 
idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 
{code:scala}
val table = util.tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
null) t2 where a1 = b2")
util.tableEnv.registerTable("TempTable", table)

val table1 = util.tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 
70")
util.tableEnv.writeToSink(table1, Sink1)

val table2 = util.tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 
70")
util.tableEnv.writeToSink(table2, Sink2)
{code}


 !image-2019-05-07-13-33-02-793.png! 


> Supports dag (multiple-sinks query) optimization
> 
>
> Key: FLINK-12424
> URL: https://issues.apache.org/jira/browse/FLINK-12424
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Attachments: image-2019-05-07-13-33-02-793.png
>
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
> there are more than one sink in a query, each sink-tree will be optimized 
> independent and the result execution plans are also completely independent. 
> Actually, there is a high probability of duplicate computing for a 
> multiple-sinks query. This issue aims to resolve the above problem. 
> The basic idea of the solution is as following: 
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
> just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added 
> in {{TableEnvironment}}, this method will trigger whole plan optimization and 
> execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common 
> sub-plan
> 2. optimize each block from leaf block to root block, each block only needs 
> to be optimized once
> e.g. 
> {code:scala}
> val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
> MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
> null) t2 where 

[jira] [Updated] (FLINK-12424) Supports dag (multiple-sinks query) optimization

2019-05-06 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12424:
---
Description: 
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. The basic 
idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 
{code:scala}
val table = util.tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
null) t2 where a1 = b2")
util.tableEnv.registerTable("TempTable", table)

val table1 = util.tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 
70")
util.tableEnv.writeToSink(table1, Sink1)

val table2 = util.tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 
70")
util.tableEnv.writeToSink(table2, Sink2)
{code}


 !image-2019-05-07-13-33-02-793.png! 

  was:
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. The basic 
idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 

 !image-2019-05-07-13-33-02-793.png! 


> Supports dag (multiple-sinks query) optimization
> 
>
> Key: FLINK-12424
> URL: https://issues.apache.org/jira/browse/FLINK-12424
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Attachments: image-2019-05-07-13-33-02-793.png
>
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
> there are more than one sink in a query, each sink-tree will be optimized 
> independent and the result execution plans are also completely independent. 
> Actually, there is a high probability of duplicate computing for a 
> multiple-sinks query. This issue aims to resolve the above problem. The basic 
> idea of the solution is as following: 
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
> just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added 
> in {{TableEnvironment}}, this method will trigger whole plan optimization and 
> execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common 
> sub-plan
> 2. optimize each block from leaf block to root block, each block only needs 
> to be optimized once
> e.g. 
> {code:scala}
> val table = util.tableEnv.sqlQuery("select * from (select a as a1, b as b1 
> from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c 
> is not null) t2 where a1 = b2")
> util.tableEnv.registerTable("TempTable", table)
> val table1 = util.tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 
> 70")
> util.tableEnv.writeToSink(table1, Sink1)
> val table2 = util.tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 
> 70")
> util.tableEnv.writeToSink(table2, Sink2)
> {code}
>  !image-2019-05-07-13-33-02-793.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12425) Implement RPCs to allow clients release result partitions in a Flink cluster.

2019-05-06 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created FLINK-12425:


 Summary: Implement RPCs to allow clients release result partitions 
in a Flink cluster.
 Key: FLINK-12425
 URL: https://issues.apache.org/jira/browse/FLINK-12425
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Jiangjie Qin


This ticket implements the following:
 # An RPC between client and dispatcher to release result partitions.
 # An RPC from dispatcher to RM to release result partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12424) Supports dag (multiple-sinks query) optimization

2019-05-06 Thread godfrey he (JIRA)
godfrey he created FLINK-12424:
--

 Summary: Supports dag (multiple-sinks query) optimization
 Key: FLINK-12424
 URL: https://issues.apache.org/jira/browse/FLINK-12424
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
 Attachments: image-2019-05-07-13-33-02-793.png

Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
there are more than one sink in a query, each sink-tree will be optimized 
independent and the result execution plans are also completely independent. 
Actually, there is a high probability of duplicate computing for a 
multiple-sinks query. This issue aims to resolve the above problem. The basic 
idea of the solution is as following: 
1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in 
{{TableEnvironment}}, this method will trigger whole plan optimization and 
execute the job.

The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to 
be optimized once
e.g. 

 !image-2019-05-07-13-33-02-793.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support 
view related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#discussion_r281461138
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 ##
 @@ -44,82 +44,96 @@ public static void init() throws IOException {
// =
 
// TODO: re-enable these tests once HiveCatalog support table operations
-   @Test
 
 Review comment:
   I'm wondering what's the point of removing test annotation as those tests 
are empty anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support 
view related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#discussion_r281461138
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 ##
 @@ -44,82 +44,96 @@ public static void init() throws IOException {
// =
 
// TODO: re-enable these tests once HiveCatalog support table operations
-   @Test
 
 Review comment:
   I'm wondering what's the point of removing test annotation as those tests 
are empty anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-06 Thread GitBox
zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] 
Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281460791
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionMemory.Writer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the ResultSubpartition for a bounded result transferred
+ * in a blocking manner: The result is first produced, then consumed.
+ * The result can be consumed possibly multiple times.
+ *
+ * The implementation creates a temporary memory mapped file and writes all 
buffers to that
+ * memory and serves the result from that memory. The kernel backs the mapped 
memory region
+ * with physical memory and file space incrementally as new pages are filled.
+ */
+class BoundedBlockingSubpartition extends ResultSubpartition {
+
+   /** This lock guards the creation of readers and disposal of the memory 
mapped file. */
+   private final Object lock = new Object();
+
+   /** The current buffer, may be filled further over time. */
+   @Nullable
+   private BufferConsumer currentBuffer;
+
+   /** The memory that we store the data in, via a memory mapped file. */
+   private final MemoryMappedBuffers memory;
+
+   /** All created and not yet released readers. */
+   @GuardedBy("lock")
+   private final Set readers;
+
+   /** Counter for the number of data buffers (not events!) written. */
+   private int numDataBuffersWritten;
+
+   /** The counter for the number of data buffers and events. */
+   private int numBuffersAndEventsWritten;
+
+   /** Flag indicating whether the writing has finished and this is now 
available for read. */
+   private boolean isFinished;
+
+   /** Flag indicating whether the subpartition has been released. */
+   private boolean isReleased;
+
+   /**
+* Common constructor.
+*/
+   public BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath) throws IOException {
+   this(index, parent, filePath, Integer.MAX_VALUE);
+   }
+
+   /**
+* Constructor for testing. By default regions are rolled over at 2GB 
(max size of direct buffers
+* in Java). This constructor allows tests to pass in a smaller 
threshold to test rolling over
+* without having to actually produce more than 2GB during testing.
+*/
+   @VisibleForTesting
+   BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath,
+   int maxMMapRegionSize) throws IOException {
+
+   super(index, parent);
+
+   final FileChannel fc = 

[GitHub] [flink] xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support 
view related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#discussion_r281460262
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -148,14 +161,24 @@ public static CatalogBaseTable createCatalogTable(Table 
hiveTable) {
// Partition keys
List partitionKeys = new ArrayList<>();
 
-   if (hiveTable.getPartitionKeys() != null && 
hiveTable.getPartitionKeys().isEmpty()) {
+   if (hiveTable.getPartitionKeys().isEmpty()) {
partitionKeys = hiveTable.getPartitionKeys().stream()
 
 Review comment:
   if hive table partition keys are empty, we we need to iterate thru it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support 
view related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#discussion_r281459369
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -118,18 +123,26 @@ public static Table createHiveTable(ObjectPath 
tablePath, CatalogBaseTable table
hiveTable.setPartitionKeys(new ArrayList<>());
}
 
-   hiveTable.setSd(sd);
 
 Review comment:
   I'm wondering why we remove this line. If this is not needed, then we should 
probably remove everything about Sd.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-06 Thread GitBox
zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] 
Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281459388
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionMemory.Writer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the ResultSubpartition for a bounded result transferred
+ * in a blocking manner: The result is first produced, then consumed.
+ * The result can be consumed possibly multiple times.
+ *
+ * The implementation creates a temporary memory mapped file and writes all 
buffers to that
+ * memory and serves the result from that memory. The kernel backs the mapped 
memory region
+ * with physical memory and file space incrementally as new pages are filled.
+ */
+class BoundedBlockingSubpartition extends ResultSubpartition {
+
+   /** This lock guards the creation of readers and disposal of the memory 
mapped file. */
+   private final Object lock = new Object();
+
+   /** The current buffer, may be filled further over time. */
+   @Nullable
+   private BufferConsumer currentBuffer;
+
+   /** The memory that we store the data in, via a memory mapped file. */
+   private final MemoryMappedBuffers memory;
+
+   /** All created and not yet released readers. */
+   @GuardedBy("lock")
+   private final Set readers;
+
+   /** Counter for the number of data buffers (not events!) written. */
+   private int numDataBuffersWritten;
+
+   /** The counter for the number of data buffers and events. */
+   private int numBuffersAndEventsWritten;
+
+   /** Flag indicating whether the writing has finished and this is now 
available for read. */
+   private boolean isFinished;
+
+   /** Flag indicating whether the subpartition has been released. */
+   private boolean isReleased;
+
+   /**
+* Common constructor.
+*/
+   public BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath) throws IOException {
+   this(index, parent, filePath, Integer.MAX_VALUE);
+   }
+
+   /**
+* Constructor for testing. By default regions are rolled over at 2GB 
(max size of direct buffers
+* in Java). This constructor allows tests to pass in a smaller 
threshold to test rolling over
+* without having to actually produce more than 2GB during testing.
+*/
+   @VisibleForTesting
+   BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath,
+   int maxMMapRegionSize) throws IOException {
+
+   super(index, parent);
+
+   final FileChannel fc = 

[GitHub] [flink] xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
xuefuz commented on a change in pull request #8339: [FLINK-12240][hive] Support 
view related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#discussion_r281459369
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -118,18 +123,26 @@ public static Table createHiveTable(ObjectPath 
tablePath, CatalogBaseTable table
hiveTable.setPartitionKeys(new ArrayList<>());
}
 
-   hiveTable.setSd(sd);
 
 Review comment:
   I'm wondering why we remove this line. If this is not needed, then we should 
probably remove everything about Sd.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8317: [FLINK-12371] [table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to SemiJoin, and generating optimized logi

2019-05-06 Thread GitBox
danny0405 commented on a change in pull request #8317: [FLINK-12371] 
[table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to 
SemiJoin, and generating optimized logical plan
URL: https://github.com/apache/flink/pull/8317#discussion_r281458212
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/SemiJoin.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.core;
+
+import org.apache.flink.table.plan.util.FlinkRelMdUtil;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT
+ * TO SUPPORT ANTI-JOIN AND NON-EQUI JOIN CONDITION.
+ */
+
+/**
+ * Relational expression that joins two relational expressions according to 
some
+ * condition, but outputs only columns from the left input, and eliminates
+ * duplicates.
+ *
+ * The effect is something like the SQL {@code IN} operator.
+ */
+public class SemiJoin extends Join {
 
 Review comment:
   No, `SemiJoin` would be deprecated in Calcite 1.20.0, we should use `Join` 
with `JoinRelType#SEMI` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-06 Thread GitBox
zhijiangW commented on a change in pull request #8290: [FLINK-12070] [network] 
Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281457936
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 ##
 @@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionMemory.Writer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the ResultSubpartition for a bounded result transferred
+ * in a blocking manner: The result is first produced, then consumed.
+ * The result can be consumed possibly multiple times.
+ *
+ * The implementation creates a temporary memory mapped file and writes all 
buffers to that
+ * memory and serves the result from that memory. The kernel backs the mapped 
memory region
+ * with physical memory and file space incrementally as new pages are filled.
+ */
+class BoundedBlockingSubpartition extends ResultSubpartition {
+
+   /** This lock guards the creation of readers and disposal of the memory 
mapped file. */
+   private final Object lock = new Object();
+
+   /** The current buffer, may be filled further over time. */
+   @Nullable
+   private BufferConsumer currentBuffer;
+
+   /** The memory that we store the data in, via a memory mapped file. */
+   private final MemoryMappedBuffers memory;
+
+   /** All created and not yet released readers. */
+   @GuardedBy("lock")
+   private final Set readers;
+
+   /** Counter for the number of data buffers (not events!) written. */
+   private int numDataBuffersWritten;
+
+   /** The counter for the number of data buffers and events. */
+   private int numBuffersAndEventsWritten;
+
+   /** Flag indicating whether the writing has finished and this is now 
available for read. */
+   private boolean isFinished;
+
+   /** Flag indicating whether the subpartition has been released. */
+   private boolean isReleased;
+
+   /**
+* Common constructor.
+*/
+   public BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath) throws IOException {
+   this(index, parent, filePath, Integer.MAX_VALUE);
+   }
+
+   /**
+* Constructor for testing. By default regions are rolled over at 2GB 
(max size of direct buffers
+* in Java). This constructor allows tests to pass in a smaller 
threshold to test rolling over
+* without having to actually produce more than 2GB during testing.
+*/
+   @VisibleForTesting
+   BoundedBlockingSubpartition(
+   int index,
+   ResultPartition parent,
+   Path filePath,
+   int maxMMapRegionSize) throws IOException {
+
+   super(index, parent);
+
+   final FileChannel fc = 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281453729
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   }
+
+   @Override
+   public void startScheduling() {
+   final Set allVertices = 
getAllVerticesFromTopology();
+   allocateSlotsAndDeploy(allVertices);
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   allocateSlotsAndDeploy(verticesNeedingRestart);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
+   // Will not react to these notifications.
+   }
+
+   @Override
+   public void onPartitionConsumable(ExecutionVertexID executionVertexId, 
ResultPartitionID resultPartitionId) {
+   // Will not react to these notifications.
+   }
+
+   private void allocateSlotsAndDeploy(final Set 
verticesNeedingStarted) {
+   final List 
executionVertexDeploymentOptions =
+   
createExecutionVertexDeploymentOptions(verticesNeedingStarted);
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   private Set getAllVerticesFromTopology() {
+   return StreamSupport
+   
.stream(schedulingTopology.getVertices().spliterator(), false)
+   .map(SchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+   }
+
+   private List 
createExecutionVertexDeploymentOptions(
+   final Iterable 
verticesNeedingStarted) {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingStarted) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   return executionVertexDeploymentOptions;
+   }
+
+   /**
+* The factory for creating {@link EagerSchedulingStrategy}.
+*/
+   public static class Factory implements SchedulingStrategyFactory {
+
+   @Override
+   public SchedulingStrategy getInstance(
 
 Review comment:
   Shall we use `create` in the interface, which is used by most of the 
`Factory` of the project?


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281454713
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
+   private final JobVertexID jobVertexId;
+
+   private final int subtaskIndex;
+
+   public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) {
+   this.jobVertexId = checkNotNull(jobVertexId);
+   this.subtaskIndex = subtaskIndex;
+   checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater 
than or equal to 0");
+   }
+
+   public JobVertexID getJobVertexId() {
+   return jobVertexId;
+   }
+
+   public int getSubtaskIndex() {
+   return subtaskIndex;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   ExecutionVertexID that = (ExecutionVertexID) o;
+
+   return subtaskIndex == that.subtaskIndex && 
jobVertexId.equals(that.jobVertexId);
+   }
+
+   @Override
+   public int hashCode() {
+   int result = jobVertexId.hashCode();
+   result = 31 * result + subtaskIndex;
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return jobVertexId + "_" + subtaskIndex;
 
 Review comment:
   How about using `return String.format("%s (%d)", jobVertexId, subtaskIndex);`
   Since `ExecutionVertex#taskNameWithSubtask` is like `myTask (2/7)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281455320
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperations implements SchedulerOperations {
+
+   private final List> 
scheduledVertices = new ArrayList<>();
+
+   @Override
+   public void 
allocateSlotsAndDeploy(Collection 
executionVertexDeploymentOptions) {
+   scheduledVertices.add(executionVertexDeploymentOptions);
+   }
+
+   public List> 
getScheduledVertices() {
 
 Review comment:
   access can be package-private 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281455385
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A simple scheduling execution vertex for testing purposes.
+ */
+public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVertex {
+
+   private final ExecutionVertexID executionVertexId;
+
+   public TestingSchedulingExecutionVertex(JobVertexID jobVertexId, int 
subtaskIndex) {
 
 Review comment:
   access can be package-private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281453965
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   }
+
+   @Override
+   public void startScheduling() {
+   final Set allVertices = 
getAllVerticesFromTopology();
+   allocateSlotsAndDeploy(allVertices);
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   allocateSlotsAndDeploy(verticesNeedingRestart);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
+   // Will not react to these notifications.
+   }
+
+   @Override
+   public void onPartitionConsumable(ExecutionVertexID executionVertexId, 
ResultPartitionID resultPartitionId) {
+   // Will not react to these notifications.
+   }
+
+   private void allocateSlotsAndDeploy(final Set 
verticesNeedingStarted) {
+   final List 
executionVertexDeploymentOptions =
+   
createExecutionVertexDeploymentOptions(verticesNeedingStarted);
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   private Set getAllVerticesFromTopology() {
+   return StreamSupport
+   
.stream(schedulingTopology.getVertices().spliterator(), false)
+   .map(SchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+   }
+
+   private List 
createExecutionVertexDeploymentOptions(
+   final Iterable 
verticesNeedingStarted) {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingStarted) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   return executionVertexDeploymentOptions;
+   }
+
+   /**
+* The factory for creating {@link EagerSchedulingStrategy}.
+*/
+   public static class Factory implements SchedulingStrategyFactory {
+
+   @Override
+   public SchedulingStrategy getInstance(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+ 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281455470
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+   private final Map 
schedulingExecutionVertices = new HashMap<>();
+
+   private final Map schedulingResultPartitions = new HashMap<>();
+
+   @Override
+   public Iterable getVertices() {
+   return 
Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+   }
+
+   @Override
+   public Optional getVertex(ExecutionVertexID 
executionVertexId)  {
+   return 
Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+   }
+
+   @Override
+   public Optional getResultPartition(
+   IntermediateResultPartitionID 
intermediateResultPartitionId) {
+   return 
Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+   }
+
+   public void addSchedulingExecutionVertex(SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   
schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), 
schedulingExecutionVertex);
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+   }
+
+   public void addSchedulingResultPartitions(final 
Collection resultPartitions) {
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8296: [FLINK-12228] 
[runtime] Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281455438
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+   private final Map 
schedulingExecutionVertices = new HashMap<>();
+
+   private final Map schedulingResultPartitions = new HashMap<>();
+
+   @Override
+   public Iterable getVertices() {
+   return 
Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+   }
+
+   @Override
+   public Optional getVertex(ExecutionVertexID 
executionVertexId)  {
+   return 
Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+   }
+
+   @Override
+   public Optional getResultPartition(
+   IntermediateResultPartitionID 
intermediateResultPartitionId) {
+   return 
Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+   }
+
+   public void addSchedulingExecutionVertex(SchedulingExecutionVertex 
schedulingExecutionVertex) {
 
 Review comment:
   access can be package-private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException`: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of {a, e}, we might introduce new exceptions to cover them and 
failover strategy might have different rules for considering them.
   
   So I think there might have two options: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step ({a, e} might be 
considered if necessary future or in other ways), then the current 
`PartitionNotFound` is enough and no need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException`: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the PartitionNotFoundException is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. PartitionNotFound: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12330) Add integrated Tox for ensuring compatibility with the python2/3 version

2019-05-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12330:
---
Labels: pull-request-available  (was: )

> Add integrated Tox for ensuring compatibility with the python2/3 version
> 
>
> Key: FLINK-12330
> URL: https://issues.apache.org/jira/browse/FLINK-12330
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available
>
> Add integrated Tox for ensuring compatibility with the python2/3 version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 opened a new pull request #8355: [FLINK-12330][python]Add integrated Tox for ensuring compatibility of multi-version of python.

2019-05-06 Thread GitBox
sunjincheng121 opened a new pull request #8355: [FLINK-12330][python]Add 
integrated Tox for ensuring compatibility of multi-version of python.
URL: https://github.com/apache/flink/pull/8355
 
 
   
   ## What is the purpose of the change
   Add integrated Tox for ensuring compatibility of multi-version of python.
   
   ## Brief change log
   
   *(for example:)*
 - Add lint-python.sh for integrated Conda and Tox.
 - Add the usage of how to run the lint-python.sh in README.md.
   
   ## Verifying this change
   This change without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8355: [FLINK-12330][python]Add integrated Tox for ensuring compatibility of multi-version of python.

2019-05-06 Thread GitBox
flinkbot commented on issue #8355: [FLINK-12330][python]Add integrated Tox for 
ensuring compatibility of multi-version of python.
URL: https://github.com/apache/flink/pull/8355#issuecomment-489901389
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8317: [FLINK-12371] [table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to SemiJoin, and generating optimized logi

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8317: [FLINK-12371] 
[table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to 
SemiJoin, and generating optimized logical plan
URL: https://github.com/apache/flink/pull/8317#discussion_r281452013
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/SemiJoin.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.core;
+
+import org.apache.flink.table.plan.util.FlinkRelMdUtil;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT
+ * TO SUPPORT ANTI-JOIN AND NON-EQUI JOIN CONDITION.
+ */
+
+/**
+ * Relational expression that joins two relational expressions according to 
some
+ * condition, but outputs only columns from the left input, and eliminates
+ * duplicates.
+ *
+ * The effect is something like the SQL {@code IN} operator.
+ */
+public class SemiJoin extends Join {
 
 Review comment:
   cc @danny0405 , does this consistent with calcite's latest changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281447680
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigImpl.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfig$;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PlannerConfigImpl} holds parameters to configure the behavior of 
Blink planner.
+ */
+public class PlannerConfigImpl implements PlannerConfig {
+
+   /**
+* Defines the configuration of Calcite for Blink planner.
+*/
+   private CalciteConfig calciteConfig = CalciteConfig$.MODULE$.DEFAULT();
+
+   /**
+* Defines planner defined configuration for Blink planner.
+*/
+   private Configuration conf = GlobalConfiguration.loadConfiguration();
+
+   /**
+* Returns planner defined configuration for Blink planner.
+*/
+   public Configuration getConf() {
+   return conf;
+   }
+
+   /**
+* Sets planner defined configuration for Blink planner.
+*/
+   public void setConf(Configuration conf) {
+   this.conf = GlobalConfiguration.loadConfiguration();
+   this.conf.addAll(conf);
+   }
+
+   /**
+* Returns the configuration of Calcite for Blink planner.
+*/
+   public CalciteConfig getCalciteConfig() {
+   return calciteConfig;
+   }
+
+   public void setCalciteConfig(CalciteConfig calciteConfig) {
+   this.calciteConfig = Preconditions.checkNotNull(calciteConfig);
+   }
+
+   public static PlannerConfigImpl getDefault() {
+   return new PlannerConfigImpl();
 
 Review comment:
   I'm not sure if this is useful, and not sure what's the purpose of this API. 
Do you want to letting everyone reuse the same default instance? Or it's just 
another way to construct this class? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281446411
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/ConfigurationUtils.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.table.api.OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_ENFORCER;
+import static 
org.apache.flink.table.api.TableConfigOptions.SQL_EXEC_STATE_TTL_MAX_MS;
+import static 
org.apache.flink.table.api.TableConfigOptions.SQL_EXEC_STATE_TTL_MS;
+
+/**
+ * Utility class for {@link Configuration} related helper functions.
+ */
+public class ConfigurationUtils {
+
+   /**
+* Updates configuration with a specified minimum and a maximum time 
interval for how long idle state, i.e., state
+* which was not updated, will be retained.
+* State will never be cleared until it was idle for less than the 
minimum time and will never
+* be kept if it was idle for more than the maximum time.
+* When new data arrives for previously cleaned-up state, the new data 
will be handled as if it
+* was the first data. This can result in previous results being 
overwritten.
+* Set to 0 (zero) to never clean-up the state.
+*
+* @param minTime The minimum time interval for which idle state is 
retained. Set to 0 (zero) to
+* never clean-up the state.
+* @param maxTime The maximum time interval for which idle state is 
retained. May not be smaller
+* than than minTime. Set to 0 (zero) to never clean-up the state.
+* @param configuration configuration to update.
+*/
+   public static Configuration withIdleStateRetentionTime(Configuration 
configuration, Time minTime, Time maxTime) {
+   configuration.setLong(SQL_EXEC_STATE_TTL_MS, 
minTime.toMilliseconds());
+   configuration.setLong(SQL_EXEC_STATE_TTL_MAX_MS, 
maxTime.toMilliseconds());
+   return configuration;
+   }
+
+   /**
+* Gets the maximum time until state which was not updated will be 
retained.
+*
+* @param configuration the configuration object
+* @return the maximum time until state which was not updated will be 
retained.
+*/
+   public static Long getMinIdleStateRetentionTime(Configuration 
configuration) {
+   return configuration.getLong(SQL_EXEC_STATE_TTL_MS);
+   }
+
+   /**
+* Returns the maximum time until state which was not updated will be 
retained.
+*
+* @param configuration the configuration object
+* @return the maximum time until state which was not updated will be 
retained.
+*/
+   public static Long getMaxIdleStateRetentionTime(Configuration 
configuration) {
+   if (configuration.contains(SQL_EXEC_STATE_TTL_MS) && 
!configuration.contains(SQL_EXEC_STATE_TTL_MAX_MS)) {
+   configuration.setLong(SQL_EXEC_STATE_TTL_MAX_MS, 
getMinIdleStateRetentionTime(configuration) * 2);
+   }
+   return configuration.getLong(SQL_EXEC_STATE_TTL_MAX_MS);
+   }
+
+   /**
+* Returns whether a operator is disabled.
+*
+* @param configuration the configuration object
+* @return return true if the operator is disabled, else false.
+*/
+   public static boolean isOperatorDisabled(Configuration configuration, 
DisabledOperatorType operatorType) {
+   String disabledOperators = 
configuration.getString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS);
+   String[] splittedDisabledOperators = 
disabledOperators.split(",");
+   Set disabledOperatorSet = new HashSet<>();
+   for (String splittedDisabledOperator : 
splittedDisabledOperators) 

[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281447839
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -96,8 +97,9 @@ class BatchTableEnvironment(
   private def mergeParameters(): Unit = {
 if (streamEnv != null && streamEnv.getConfig != null) {
   val parameters = new Configuration()
-  if (config != null && config.getConf != null) {
-parameters.addAll(config.getConf)
+  val parametersFromTableConfig = infer(config).getConf
 
 Review comment:
   It looks unclear if you import the static method and directly use `infer` 
here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281447320
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigImpl.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfig$;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PlannerConfigImpl} holds parameters to configure the behavior of 
Blink planner.
+ */
+public class PlannerConfigImpl implements PlannerConfig {
+
+   /**
+* Defines the configuration of Calcite for Blink planner.
+*/
+   private CalciteConfig calciteConfig = CalciteConfig$.MODULE$.DEFAULT();
+
+   /**
+* Defines planner defined configuration for Blink planner.
+*/
+   private Configuration conf = GlobalConfiguration.loadConfiguration();
+
+   /**
+* Returns planner defined configuration for Blink planner.
+*/
+   public Configuration getConf() {
+   return conf;
+   }
+
+   /**
+* Sets planner defined configuration for Blink planner.
+*/
+   public void setConf(Configuration conf) {
+   this.conf = GlobalConfiguration.loadConfiguration();
+   this.conf.addAll(conf);
+   }
+
+   /**
+* Returns the configuration of Calcite for Blink planner.
+*/
+   public CalciteConfig getCalciteConfig() {
+   return calciteConfig;
+   }
+
+   public void setCalciteConfig(CalciteConfig calciteConfig) {
+   this.calciteConfig = Preconditions.checkNotNull(calciteConfig);
+   }
+
+   public static PlannerConfigImpl getDefault() {
+   return new PlannerConfigImpl();
+   }
+
+   /**
+* Infers {@link PlannerConfigImpl} from {@link TableConfig}.
+*/
+   public static PlannerConfigImpl infer(TableConfig tableConfig) {
 
 Review comment:
   infer -> from?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281450922
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/DeadlockBreakupTest.scala
 ##
 @@ -38,10 +39,9 @@ class DeadlockBreakupTest extends TableTestBase {
 
   @Test
   def testSubplanReuse_SetExchangeAsBatch(): Unit = {
-util.tableEnv.getConfig.getConf.setBoolean(
-  PlannerConfigOptions.SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
-util.tableEnv.getConfig.getConf.setBoolean(
-  PlannerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, true)
+val conf = infer(util.tableEnv.getConfig).getConf
 
 Review comment:
   Can PlannerConfig directly provide the ability to get/set some single 
configuration? It looks obscure through current API to set some config values. 
You need to know that the configuration object is mutable after you `getConf`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281447197
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigImpl.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfig$;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PlannerConfigImpl} holds parameters to configure the behavior of 
Blink planner.
+ */
+public class PlannerConfigImpl implements PlannerConfig {
+
+   /**
+* Defines the configuration of Calcite for Blink planner.
+*/
+   private CalciteConfig calciteConfig = CalciteConfig$.MODULE$.DEFAULT();
+
+   /**
+* Defines planner defined configuration for Blink planner.
+*/
+   private Configuration conf = GlobalConfiguration.loadConfiguration();
 
 Review comment:
   why do we load flink-conf.yaml through GlobalConfiguration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8294: [FLINK-12348][table-planner-blink]Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8294: 
[FLINK-12348][table-planner-blink]Use TableConfig in api module to replace 
TableConfig in blink-planner module.
URL: https://github.com/apache/flink/pull/8294#discussion_r281446233
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/ConfigurationUtils.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
 
 Review comment:
   move to other package? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8288: [FLINK-12345] [table-planner-blink] Add support for generating optimized logical plan for stream window aggregate

2019-05-06 Thread GitBox
wuchong commented on a change in pull request #8288: [FLINK-12345] 
[table-planner-blink] Add support for generating optimized logical plan for 
stream window aggregate
URL: https://github.com/apache/flink/pull/8288#discussion_r281450454
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
 ##
 @@ -37,16 +39,24 @@ public ProctimeSqlFunction() {
super(
"PROCTIME",
SqlKind.OTHER_FUNCTION,
-   ReturnTypes.explicit(new ProctimeRelProtoDataType()),
-   null,
-   OperandTypes.NILADIC,
-   SqlFunctionCategory.TIMEDATE);
+   
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), 
SqlTypeTransforms.TO_NULLABLE),
 
 Review comment:
   `PROCTIME()` is a built-in function used to generate proctime attribute 
field, whose return type must be proctime time indicator.
   
   I think what you want is `ProctimeMaterializeSqlFunction` which materialize 
a proctime field into `Timestamp` field.
   
   So I think we don't need to change this file. We should use 
`FlinkSqlOperatorTable.PROCTIME_MATERIALIZE` in `RelTimeIndicatorConverter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8288: [FLINK-12345] [table-planner-blink] Add support for generating optimized logical plan for stream window aggregate

2019-05-06 Thread GitBox
wuchong commented on a change in pull request #8288: [FLINK-12345] 
[table-planner-blink] Add support for generating optimized logical plan for 
stream window aggregate
URL: https://github.com/apache/flink/pull/8288#discussion_r281448847
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
 ##
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.physical.stream
+
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, 
TableException}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.plan.util.AggregateUtil.{isRowtimeIndicatorType, 
isTimeIntervalType}
+import org.apache.flink.table.plan.util.{RelExplainUtil, WindowEmitStrategy}
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Streaming group window aggregate physical node which will be translate to 
window operator.
+  *
+  * If requirements satisfied, it will be translated into minibatch window 
operator, otherwise,
 
 Review comment:
   The minibatch-window is still in beta in Blink. I think we can support it 
after 1.9 release.
   
   What do you think about removing minibatch-window relative comments and 
codes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11839) Introduce FlinkLogicalWindowAggregate

2019-05-06 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-11839.
--
Resolution: Duplicate

> Introduce FlinkLogicalWindowAggregate
> -
>
> Key: FLINK-11839
> URL: https://issues.apache.org/jira/browse/FLINK-11839
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>
> {{FlinkLogicalWindowAggregate}} depends on {{Expression}} which hasn't been 
> ported into [table-planner-blink] yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12416) Docker build script fails on symlink creation ln -s

2019-05-06 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834344#comment-16834344
 ] 

Yun Tang commented on FLINK-12416:
--

[~slav4ik], would you please share the full command to run the 'build.sh'?

> Docker build script fails on symlink creation ln -s
> ---
>
> Key: FLINK-12416
> URL: https://issues.apache.org/jira/browse/FLINK-12416
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Docker
>Affects Versions: 1.8.0
>Reporter: Slava D
>Priority: Major
>
> When using script 'build.sh' from 'flink-container/docker' it fails on 
> {code:java}
> + ln -s /opt/flink-1.8.0-bin-hadoop28-scala_2.12.tgz /opt/flink
> + ln -s /opt/job.jar /opt/flink/lib
> ln: /opt/flink/lib: Not a directory
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11561) Translate "Flink Architecture" page into Chinese

2019-05-06 Thread Tom Goong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834343#comment-16834343
 ] 

Tom Goong commented on FLINK-11561:
---

[~jark]  Sorry, I thought that the flink-parent documentation should be put 
together with flink-web. Actually I am wrong

> Translate "Flink Architecture" page into Chinese
> 
>
> Key: FLINK-11561
> URL: https://issues.apache.org/jira/browse/FLINK-11561
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Tom Goong
>Priority: Major
>
> Translate "Flink Architecture" page into Chinese.
> The markdown file is located in: flink-web/flink-architecture.zh.md
> The url link is: https://flink.apache.org/zh/flink-architecture.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8288: [FLINK-12345] [table-planner-blink] Add support for generating optimized logical plan for stream window aggregate

2019-05-06 Thread GitBox
wuchong commented on a change in pull request #8288: [FLINK-12345] 
[table-planner-blink] Add support for generating optimized logical plan for 
stream window aggregate
URL: https://github.com/apache/flink/pull/8288#discussion_r281446085
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 ##
 @@ -53,6 +53,30 @@ class TableConfig {
 */
   private var maxGeneratedCodeLength: Int = 64000 // just an estimate
 
+  private val DEFAULT_FIRE_INTERVAL = Long.MinValue
+
+  /**
+* The early firing interval in milli second, early fire is the emit 
strategy
+* before watermark advanced to end of window.
+*
+* < 0 means no early fire
+* 0 means no delay (fire on every element).
+* > 0 means the fire interval
+*/
+  private var earlyFireInterval = DEFAULT_FIRE_INTERVAL
 
 Review comment:
   I think we can support it if we expose early interval through options 
in `TableConfig`.
   
   I think it's nice to have it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8288: [FLINK-12345] [table-planner-blink] Add support for generating optimized logical plan for stream window aggregate

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8288: [FLINK-12345] 
[table-planner-blink] Add support for generating optimized logical plan for 
stream window aggregate
URL: https://github.com/apache/flink/pull/8288#discussion_r281444733
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.table.`type`.InternalTypes
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.plan.nodes.calcite._
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Traverses a [[RelNode]] tree and converts fields with 
[[TimeIndicatorRelDataType]] type. If a
+  * time attribute is accessed for a calculation, it will be materialized. 
Forwarding is allowed in
+  * some cases, but not all.
+  */
+class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
+
+  private def timestamp(isNullable: Boolean): RelDataType = rexBuilder
+.getTypeFactory
+.asInstanceOf[FlinkTypeFactory]
+.createTypeFromInternalType(InternalTypes.TIMESTAMP, isNullable = 
isNullable)
+
+  val materializerUtils = new RexTimeIndicatorMaterializerUtils(rexBuilder)
+
+  override def visit(intersect: LogicalIntersect): RelNode =
+visitSetOp(intersect)
+
+  override def visit(union: LogicalUnion): RelNode =
+visitSetOp(union)
+
+  override def visit(aggregate: LogicalAggregate): RelNode = 
convertAggregate(aggregate)
+
+  override def visit(minus: LogicalMinus): RelNode =
+visitSetOp(minus)
+
+  override def visit(sort: LogicalSort): RelNode = {
+
 
 Review comment:
   delete blank line 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8288: [FLINK-12345] [table-planner-blink] Add support for generating optimized logical plan for stream window aggregate

2019-05-06 Thread GitBox
KurtYoung commented on a change in pull request #8288: [FLINK-12345] 
[table-planner-blink] Add support for generating optimized logical plan for 
stream window aggregate
URL: https://github.com/apache/flink/pull/8288#discussion_r281444516
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 ##
 @@ -53,6 +53,30 @@ class TableConfig {
 */
   private var maxGeneratedCodeLength: Int = 64000 // just an estimate
 
+  private val DEFAULT_FIRE_INTERVAL = Long.MinValue
+
+  /**
+* The early firing interval in milli second, early fire is the emit 
strategy
+* before watermark advanced to end of window.
+*
+* < 0 means no early fire
+* 0 means no delay (fire on every element).
+* > 0 means the fire interval
+*/
+  private var earlyFireInterval = DEFAULT_FIRE_INTERVAL
 
 Review comment:
   cc @wuchong here, do we want to support early fire in 1.9.0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11561) Translate "Flink Architecture" page into Chinese

2019-05-06 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834334#comment-16834334
 ] 

Jark Wu commented on FLINK-11561:
-

[~Tom Goong] Yes, it is in flink-web project. What do you mean "merged 
together"?

> Translate "Flink Architecture" page into Chinese
> 
>
> Key: FLINK-11561
> URL: https://issues.apache.org/jira/browse/FLINK-11561
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Tom Goong
>Priority: Major
>
> Translate "Flink Architecture" page into Chinese.
> The markdown file is located in: flink-web/flink-architecture.zh.md
> The url link is: https://flink.apache.org/zh/flink-architecture.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12423) Add Timer metric type for flink-metrics module

2019-05-06 Thread Armstrong Nova (JIRA)
Armstrong Nova created FLINK-12423:
--

 Summary: Add Timer metric type for flink-metrics module
 Key: FLINK-12423
 URL: https://issues.apache.org/jira/browse/FLINK-12423
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Armstrong Nova
Assignee: Armstrong Nova


Hi guys, 

    Currently, Flink only support 4 registering metrics, {{Counters}}, 
{{Gauges}}, {{Histograms}} and {{Meters}}.  If we want to measure the time cost 
metric, for example P75th, P99th, P999th, right now can only use Histograms 
metric. But Histograms metric not support TimeUnit type (second, millisecond, 
microsecond, nanosecond). So it's not convenient to collaborate with outside 
metric system. 

    The Codahale/DropWizard support Time meter, so we can wrap it in Flink. 
    Thanks for your time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjffdu commented on a change in pull request #8144: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

2019-05-06 Thread GitBox
zjffdu commented on a change in pull request #8144: [FLINK-12159]. Enable 
YarnMiniCluster integration test under non-secure mode
URL: https://github.com/apache/flink/pull/8144#discussion_r281442692
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ##
 @@ -869,41 +859,40 @@ public ApplicationReport startAppMaster(
FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission 
for path.
 
+   Path remoteYarnSiteXmlPath = null;
+   File f = new File(System.getenv("YARN_CONF_DIR"), 
Utils.YARN_SITE_FILE_NAME);
+   LOG.info("Adding Yarn configuration {} to the AM container 
local resource bucket", f.getAbsolutePath());
+   Path yarnSitePath = new Path(f.getAbsolutePath());
+   remoteYarnSiteXmlPath = setupSingleLocalResource(
+   Utils.YARN_SITE_FILE_NAME,
+   fs,
+   appId,
+   yarnSitePath,
+   localResources,
+   homeDir,
+   "");
 
 Review comment:
   That's correct, I have created 
https://issues.apache.org/jira/browse/FLINK-12422 for the followup work. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12422) Remove IN_TESTS for make test code and production code consistent

2019-05-06 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-12422:
--

 Summary: Remove IN_TESTS for make test code and production code 
consistent
 Key: FLINK-12422
 URL: https://issues.apache.org/jira/browse/FLINK-12422
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.8.0
Reporter: Jeff Zhang


Currently, in we use IN_TESTS to in flink-yarn which make the test code and 
production code inconsistent. This is not a good design and may miss bugs which 
happens in production environment. This ticket is to remove IN_TESTS for make 
test code and production code consistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11561) Translate "Flink Architecture" page into Chinese

2019-05-06 Thread Tom Goong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834331#comment-16834331
 ] 

Tom Goong commented on FLINK-11561:
---

this file is located in "flink-web" project.  Will they be merged together in 
the future?

> Translate "Flink Architecture" page into Chinese
> 
>
> Key: FLINK-11561
> URL: https://issues.apache.org/jira/browse/FLINK-11561
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Tom Goong
>Priority: Major
>
> Translate "Flink Architecture" page into Chinese.
> The markdown file is located in: flink-web/flink-architecture.zh.md
> The url link is: https://flink.apache.org/zh/flink-architecture.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12421) Synchronize the latest documentation changes into Chinese documents

2019-05-06 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-12421:

Issue Type: Sub-task  (was: Task)
Parent: FLINK-11529

> Synchronize the latest documentation changes into Chinese documents
> ---
>
> Key: FLINK-12421
> URL: https://issues.apache.org/jira/browse/FLINK-12421
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are several commits to documentations have not been synchronized to 
> Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
> latest changes into Chinese documents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12421) Synchronize the latest documentation changes into Chinese documents

2019-05-06 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-12421:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-11525

> Synchronize the latest documentation changes into Chinese documents
> ---
>
> Key: FLINK-12421
> URL: https://issues.apache.org/jira/browse/FLINK-12421
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are several commits to documentations have not been synchronized to 
> Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
> latest changes into Chinese documents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12421) Synchronize the latest documentation changes into Chinese documents

2019-05-06 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-12421:

Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-11525)

> Synchronize the latest documentation changes into Chinese documents
> ---
>
> Key: FLINK-12421
> URL: https://issues.apache.org/jira/browse/FLINK-12421
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> There are several commits to documentations have not been synchronized to 
> Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
> latest changes into Chinese documents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281428840
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -0,0 +1,64 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import unittest
+
+from pyflink.find_flink_home import _find_flink_home
+from pyflink.table import TableEnvironment, TableConfig
+
+if sys.version_info[0] >= 3:
+xrange = range
+
+
+class PyFlinkTestCase(unittest.TestCase):
+"""
+Base class for unit tests.
+"""
+
+def setUp(self):
+os.environ["FLINK_TESTING"] = "1"
+
+_find_flink_home()
+print("using %s as FLINK_HOME..." % os.environ["FLINK_HOME"])
 
 Review comment:
   I think we should use` LOGGER = logging.getLogger().` instead of print, what 
to you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
flinkbot edited a comment on issue #8347: [FLINK-12326][python] Add basic test 
framework for python api
URL: https://github.com/apache/flink/pull/8347#issuecomment-489486437
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281433379
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table.py
 ##
 @@ -0,0 +1,75 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tempfile
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
 
 Review comment:
   Remove useless import.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281437149
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table.py
 ##
 @@ -0,0 +1,75 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tempfile
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
+
+def test_select(self):
+tmp_dir = tempfile.gettempdir()
+source_path = tmp_dir + '/streaming.csv'
+if os.path.isfile(source_path):
+os.remove(source_path)
+with open(source_path, 'w') as f:
+lines = '1,hi,hello\n' + '2,hi,hello\n'
+f.write(lines)
+f.close()
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+
+t_env = self.t_env
+
+# register Orders table in table environment
+t_env.register_table_source(
+"Orders",
+CsvTableSource(source_path, field_names, field_types))
+
+t_env.register_table_sink(
+"Results",
+field_names, field_types, source_sink_utils.TestAppendSink())
+
+t_env.scan("Orders") \
+ .where("a > 0") \
+ .select("a + 1, b, c") \
+ .insert_into("Results")
+
+t_env.execute()
+
+actual = source_sink_utils.results()
+expected = ['2,hi,hello', '3,hi,hello']
+self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+import unittest
 
 Review comment:
   remove this line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281433677
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_end_to_end.py
 ##
 @@ -1,73 +0,0 @@
-
 
 Review comment:
   I think it's better to keep one end_to_end example, for using check the env 
is work well. I'll open the PR for FLINK-12330 which will mention this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281437349
 
 

 ##
 File path: flink-python/pyflink/testing/test_case_utils.py
 ##
 @@ -0,0 +1,64 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import unittest
+
+from pyflink.find_flink_home import _find_flink_home
+from pyflink.table import TableEnvironment, TableConfig
+
+if sys.version_info[0] >= 3:
+xrange = range
+
+
+class PyFlinkTestCase(unittest.TestCase):
+"""
+Base class for unit tests.
+"""
+
+def setUp(self):
+os.environ["FLINK_TESTING"] = "1"
+
+_find_flink_home()
+print("using %s as FLINK_HOME..." % os.environ["FLINK_HOME"])
+
+@classmethod
+def assert_equals(cls, actual, expected):
+actual_py_list = cls.to_py_list(actual)
+actual_py_list.sort()
+expected.sort()
+assert all(x == y for x, y in zip(actual_py_list, expected))
+
+@classmethod
+def to_py_list(cls, actual):
+py_list = []
+for i in xrange(0, actual.length()):
+py_list.append(actual.apply(i))
+return py_list
+
+
+class PyFlinkStreamTableTestCase(PyFlinkTestCase):
+"""
 
 Review comment:
   I think it's better to add `PyFlinkBatchTableTestCase`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281433195
 
 

 ##
 File path: flink-python/pyflink/testing/source_sink_utils.py
 ##
 @@ -0,0 +1,127 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import glob
+import os
+import unittest
+from py4j.java_gateway import java_import
+
+from pyflink.find_flink_home import _find_flink_source_root
+from pyflink.java_gateway import get_gateway
+from pyflink.table import TableSink
+
+
+class TestTableSink(TableSink):
+"""
+Base class for test table sink.
+"""
+
+_inited = False
+
+def __init__(self, j_table_sink):
+super(TestTableSink, self).__init__(j_table_sink)
+
+@classmethod
+def _ensure_initialized(cls):
+if TestTableSink._inited:
+return
+
+FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
+filename_pattern = (
+"flink-table/flink-table-planner/target/"
+"flink-table-planner*-tests.jar")
+if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, 
filename_pattern)):
+raise unittest.SkipTest(
+"'flink-table-planner*-tests.jar' is not available. Will skip 
the related tests.")
+
+gateway = get_gateway()
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestAppendSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestRetractSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestUpsertSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.RowCollector")
+
+TestTableSink._inited = True
+
+
+class TestAppendSink(TestTableSink):
+"""
+A test append table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())
+
+
+class TestRetractSink(TestTableSink):
+"""
+A test retract table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())
+
+
+class TestUpsertSink(TestTableSink):
+"""
+A test upsert table sink.
+"""
+
+def __init__(self, keys, is_append_only):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+j_keys = gateway.new_array(gateway.jvm.String, len(keys))
+for i in xrange(0, len(keys)):
+j_keys[i] = keys[i]
+
+super(TestUpsertSink, 
self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))
+
+
+def results():
+"""
+Retrieves the results from an append table sink.
+"""
+return retract_results()
+
+
+def retract_results():
+"""
+Retrieves the results from a retract table sink.
+"""
+gateway = get_gateway()
+results = gateway.jvm.RowCollector.getAndClearValues()
+return gateway.jvm.RowCollector.retractResults(results)
+
+
+def upsert_results(keys):
+"""
+Retrieves the results from an upsert table sink.
+"""
+gateway = get_gateway()
+j_keys = gateway.new_array(gateway.jvm.int, len(keys))
+for i in xrange(0, len(keys)):
 
 Review comment:
   We should add `version check`, such as:
   ```
   if sys.version_info[0] >= 3:
   xrange = range
   ```
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281438679
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -41,5 +41,19 @@ def _find_flink_home():
 sys.exit(-1)
 
 
+def _find_flink_source_root():
+"""
+Find the flink source root directory.
+"""
+try:
+flink_source_root_dir = 
os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
+if os.path.isdir(flink_source_root_dir + "/build-target"):
 
 Review comment:
   I think it's better to remove this check. What doyou think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong opened a new pull request #8354: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 7b46f604) into Chinese documents

2019-05-06 Thread GitBox
wuchong opened a new pull request #8354: [FLINK-12421][docs-zh] Synchronize the 
latest documentation changes (commits to 7b46f604) into Chinese documents
URL: https://github.com/apache/flink/pull/8354
 
 
   
   ## What is the purpose of the change
   
   There are several commits to documentations have not been synchronized to 
Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
latest changes into Chinese documents.
   
   
   ## Brief change log
   
 - Synchronize updates to `xx.zh.md`
 - Fix the `Overview` is not translated in sidebar
   
   
   ## Verifying this change
   
   N/A
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12421) Synchronize the latest documentation changes into Chinese documents

2019-05-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12421:
---
Labels: pull-request-available  (was: )

> Synchronize the latest documentation changes into Chinese documents
> ---
>
> Key: FLINK-12421
> URL: https://issues.apache.org/jira/browse/FLINK-12421
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Minor
>  Labels: pull-request-available
>
> There are several commits to documentations have not been synchronized to 
> Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
> latest changes into Chinese documents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on issue #8347: [FLINK-12326][python] Add basic test 
framework for python api
URL: https://github.com/apache/flink/pull/8347#issuecomment-489879981
 
 
   @flinkbot approve-until architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-06 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281432761
 
 

 ##
 File path: flink-python/pyflink/testing/source_sink_utils.py
 ##
 @@ -0,0 +1,127 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import glob
+import os
+import unittest
+from py4j.java_gateway import java_import
+
+from pyflink.find_flink_home import _find_flink_source_root
+from pyflink.java_gateway import get_gateway
+from pyflink.table import TableSink
+
+
+class TestTableSink(TableSink):
+"""
+Base class for test table sink.
+"""
+
+_inited = False
+
+def __init__(self, j_table_sink):
+super(TestTableSink, self).__init__(j_table_sink)
+
+@classmethod
+def _ensure_initialized(cls):
+if TestTableSink._inited:
+return
+
+FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
+filename_pattern = (
+"flink-table/flink-table-planner/target/"
+"flink-table-planner*-tests.jar")
+if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, 
filename_pattern)):
+raise unittest.SkipTest(
+"'flink-table-planner*-tests.jar' is not available. Will skip 
the related tests.")
+
+gateway = get_gateway()
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestAppendSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestRetractSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestUpsertSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.RowCollector")
+
+TestTableSink._inited = True
+
+
+class TestAppendSink(TestTableSink):
+"""
+A test append table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())
+
+
+class TestRetractSink(TestTableSink):
+"""
+A test retract table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())
+
+
+class TestUpsertSink(TestTableSink):
+"""
+A test upsert table sink.
+"""
+
+def __init__(self, keys, is_append_only):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+j_keys = gateway.new_array(gateway.jvm.String, len(keys))
+for i in xrange(0, len(keys)):
+j_keys[i] = keys[i]
+
+super(TestUpsertSink, 
self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))
+
+
+def results():
+"""
+Retrieves the results from an append table sink.
+"""
+return retract_results()
+
+
+def retract_results():
+"""
+Retrieves the results from a retract table sink.
+"""
+gateway = get_gateway()
+results = gateway.jvm.RowCollector.getAndClearValues()
 
 Review comment:
   Can  we remove `results`.?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8354: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 7b46f604) into Chinese documents

2019-05-06 Thread GitBox
flinkbot commented on issue #8354: [FLINK-12421][docs-zh] Synchronize the 
latest documentation changes (commits to 7b46f604) into Chinese documents
URL: https://github.com/apache/flink/pull/8354#issuecomment-489880058
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12421) Synchronize the latest documentation changes into Chinese documents

2019-05-06 Thread Jark Wu (JIRA)
Jark Wu created FLINK-12421:
---

 Summary: Synchronize the latest documentation changes into Chinese 
documents
 Key: FLINK-12421
 URL: https://issues.apache.org/jira/browse/FLINK-12421
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Reporter: Jark Wu
Assignee: Jark Wu


There are several commits to documentations have not been synchronized to 
Chinese documents, i.e. `xx.zh.md`. This pull request will synchronize the 
latest changes into Chinese documents.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11634) Translate "State Backends" page into Chinese

2019-05-06 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong reassigned FLINK-11634:
-

Assignee: Tom Goong

> Translate "State Backends" page into Chinese
> 
>
> Key: FLINK-11634
> URL: https://issues.apache.org/jira/browse/FLINK-11634
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: Tom Goong
>Priority: Major
>
> doc locates in flink/docs/dev/stream/state/state_backens.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11637) Translate "Checkpoints" page into Chinese

2019-05-06 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong reassigned FLINK-11637:
-

Assignee: Tom Goong

> Translate "Checkpoints" page into Chinese
> -
>
> Key: FLINK-11637
> URL: https://issues.apache.org/jira/browse/FLINK-11637
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: Tom Goong
>Priority: Major
>
> doc locates in flink/docs/ops/state/checkpoints.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11561) Translate "Flink Architecture" page into Chinese

2019-05-06 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong reassigned FLINK-11561:
-

Assignee: Tom Goong

> Translate "Flink Architecture" page into Chinese
> 
>
> Key: FLINK-11561
> URL: https://issues.apache.org/jira/browse/FLINK-11561
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Tom Goong
>Priority: Major
>
> Translate "Flink Architecture" page into Chinese.
> The markdown file is located in: flink-web/flink-architecture.zh.md
> The url link is: https://flink.apache.org/zh/flink-architecture.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11754) Translate the "Roadmap" page into Chinese

2019-05-06 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-11754.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in flink-web: 7f603e6b557bf14a933ec741282fc25a4c26daf5

> Translate the "Roadmap" page into Chinese
> -
>
> Key: FLINK-11754
> URL: https://issues.apache.org/jira/browse/FLINK-11754
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: hanfei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The markdown file will be located in: flink-web/roadmap.zh.md
> The url link is: https://flink.apache.org/zh/roadmap.html
> Please start to work after the prior PR 
> https://github.com/apache/flink-web/pull/178 is merged. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12420) Add support of cache/invalidateCache for TableAPI

2019-05-06 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12420:
---
Component/s: Table SQL / API

> Add support of cache/invalidateCache for TableAPI
> -
>
> Key: FLINK-12420
> URL: https://issues.apache.org/jira/browse/FLINK-12420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> Add cache/invalidateCache api and its implementations for Table



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12420) Add support of cache/invalidateCache for TableAPI

2019-05-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12420:
--

 Summary: Add support of cache/invalidateCache for TableAPI
 Key: FLINK-12420
 URL: https://issues.apache.org/jira/browse/FLINK-12420
 Project: Flink
  Issue Type: Sub-task
Reporter: Ruidong Li
Assignee: Ruidong Li


Add cache/invalidateCache api and its implementations for Table



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8321: [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy StreamElement when object reuse is enabled

2019-05-06 Thread GitBox
wuchong commented on issue #8321: [FLINK-12351][DataStream] Fix 
AsyncWaitOperator to deep copy StreamElement when object reuse is enabled
URL: https://github.com/apache/flink/pull/8321#issuecomment-489873826
 
 
   Hi @tillrohrmann @pnowojski , it would be nice if you have time to review it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12419) Add support for consuming BLOCKING_PERSISTENT ResultPartition

2019-05-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12419:
--

 Summary: Add support for consuming BLOCKING_PERSISTENT 
ResultPartition
 Key: FLINK-12419
 URL: https://issues.apache.org/jira/browse/FLINK-12419
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Ruidong Li
Assignee: Ruidong Li


Add support for deploying Jobs which can consume BLOCKING_PERSISTENT 
ResultPartition generated by previous Jobs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12412) Allow ListTypeInfo used for java.util.List and MapTypeInfo used for java.util.Map

2019-05-06 Thread YangFei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834303#comment-16834303
 ] 

YangFei commented on FLINK-12412:
-

This sounds interesting. 

> Allow ListTypeInfo used for java.util.List and MapTypeInfo used for 
> java.util.Map
> -
>
> Key: FLINK-12412
> URL: https://issues.apache.org/jira/browse/FLINK-12412
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Type Serialization System
>Affects Versions: 1.9.0
>Reporter: YangFei
>Assignee: YangFei
>Priority: Major
>  Labels: pull-request-available, starer
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> {code:java}
> // code placeholder
> public static class UserBehavior { 
>   public long userId;
>   public long itemId;  
>   public int categoryId; 
>   public long timestamp;  
>   public List comments; 
> }
> public static void main(String[] args) throws Exception { 
>   PojoTypeInfo pojoType = (PojoTypeInfo) 
> TypeExtractor.createTypeInfo(UserBehavior.class); 
> }
> {code}
>  
> The filed comments in UserBehavior will be extracted by TypeExtractor to 
> GenericType .
> I think it can be extracted to ListTypeInfo .
> This would be a big improvement as in many cases classes including List or Map



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12366) Clean up Catalog APIs to make them more consistent and coherent

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12366.

Resolution: Fixed

merged in 1.9.0:  099106d8778ae39f9d99afa71a3590ae7ae916d7

> Clean up Catalog APIs to make them more consistent and coherent 
> 
>
> Key: FLINK-12366
> URL: https://issues.apache.org/jira/browse/FLINK-12366
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Some of the APIs seem inconsistent with others in terms of exception thrown 
> and error handling. This is to clean them up to maintain consistency and 
> coherence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12266) add NOTICE file for dependencies that are newly introduced in flink-connector-hive

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12266.

Resolution: Fixed

fixed as part of FLINK-12238

> add NOTICE file for dependencies that are newly introduced in 
> flink-connector-hive
> --
>
> Key: FLINK-12266
> URL: https://issues.apache.org/jira/browse/FLINK-12266
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Add a NOTICE file to `flink-connector-hive` module, because it bundles hive 
> dependency. An example 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/resources/META-INF/NOTICE.
> See more: https://cwiki.apache.org/confluence/display/FLINK/Licensing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12239) Support table related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12239.

Resolution: Fixed

Merged in 1.9.0: cf20197f1c8c51eab028da7c477dd6c7ad96db2f

> Support table related operations in GenericHiveMetastoreCatalog
> ---
>
> Key: FLINK-12239
> URL: https://issues.apache.org/jira/browse/FLINK-12239
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Support table related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12232) Support database related operations in HiveCatalog

2019-05-06 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834284#comment-16834284
 ] 

Bowen Li commented on FLINK-12232:
--

merged in 1.9.0: a3cf3f1fc69e9ed17433cbc522c7086cead7790e

> Support database related operations in HiveCatalog
> --
>
> Key: FLINK-12232
> URL: https://issues.apache.org/jira/browse/FLINK-12232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Support database related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12232) Support database related operations in HiveCatalog

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12232.

Resolution: Fixed

> Support database related operations in HiveCatalog
> --
>
> Key: FLINK-12232
> URL: https://issues.apache.org/jira/browse/FLINK-12232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Support database related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 closed pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-06 Thread GitBox
bowenli86 closed pull request #8337: [FLINK-12232][hive] Create HiveCatalog and 
support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-06 Thread GitBox
bowenli86 commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and 
support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#issuecomment-489827735
 
 
   Merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
bowenli86 commented on issue #8339: [FLINK-12240][hive] Support view related 
operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#issuecomment-489827434
 
 
   cc @xuefuz @KurtYoung @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-06 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-489827407
 
 
   cc @xuefuz @KurtYoung @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12233) Support table related operations in HiveCatalog

2019-05-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12233:
---
Labels: pull-request-available  (was: )

> Support table related operations in HiveCatalog
> ---
>
> Key: FLINK-12233
> URL: https://issues.apache.org/jira/browse/FLINK-12233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Support table related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-06 Thread GitBox
sunjincheng121 commented on issue #8311: [FLINK-10976][table] Add Aggregate 
operator to Table API
URL: https://github.com/apache/flink/pull/8311#issuecomment-489823438
 
 
   LGTM. +1 to merged
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-06 Thread GitBox
flinkbot edited a comment on issue #8311: [FLINK-10976][table] Add Aggregate 
operator to Table API
URL: https://github.com/apache/flink/pull/8311#issuecomment-487553255
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @rmetzger [PMC], @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-06 Thread GitBox
bowenli86 opened a new pull request #8353: [FLINK-12233][hive] Support table 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353
 
 
   ## What is the purpose of the change
   
   This PR introduced HiveCatalogTable and implemented table related catalog 
APIs in HiveCatalog.
   
   ## Brief change log
   
   - Created a basic `HiveCatalogTable`, and we can enrich this class later
   - Implemented table related catalog APIs in `HiveCatalog`
   - Extracted common logic between `HiveCatalog` and 
`GenericHiveMetastoreCatalog` into `HiveCatalogBase`
   - Moved a few util classes from package 
`org.apache.flink.table.catalog.hive` to 
`org.apache.flink.table.catalog.hive.util`
   
   
   ## Verifying this change
   
   Reuse tests in CatalogTestBase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   
   
   This PR depends on https://github.com/apache/flink/pull/8339


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-06 Thread GitBox
flinkbot commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-489825249
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12418) Add input/output format and SerDeLib information for Hive tables in HiveCatalogUtil#createHiveTable

2019-05-06 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12418:


 Summary: Add input/output format and SerDeLib information for Hive 
tables in HiveCatalogUtil#createHiveTable
 Key: FLINK-12418
 URL: https://issues.apache.org/jira/browse/FLINK-12418
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
 Fix For: 1.9.0


Currently when we convert a Flink's HiveCatalogTable to Hive's table  in 
HiveCatalogUtil#createHiveTable, we don't set detailed ininput/output format 
and SerDeLib in the Hive table.

This JIRA is for adding more of those information in Hive table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] robstoll opened a new pull request #8352: [hotfix][docs] mention kryo in serialization and link to page

2019-05-06 Thread GitBox
robstoll opened a new pull request #8352: [hotfix][docs] mention kryo in 
serialization and link to page
URL: https://github.com/apache/flink/pull/8352
 
 
   ## What is the purpose of the change
   improve documentation
   
   ## Brief change log
   I am new to flink so maybe I misunderstood something. As far as I am get 
Avro is only used if the POJO comes from a source which was deserialised with 
Avro. Otherwise Kryo is used. Thus I changed the text, mentioning Kryo as well 
and added a link to the `Serialization of POJO types` section  (I guess the 
link to the other page makes sense in all cases).
   
   ## Verifying this change
   - This change is a trivial rework / code cleanup without any test coverage.
   
   ## Documentation
 - Does this pull request introduce a new feature? => no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8352: [hotfix][docs] mention kryo in serialization and link to page

2019-05-06 Thread GitBox
flinkbot commented on issue #8352: [hotfix][docs] mention kryo in serialization 
and link to page
URL: https://github.com/apache/flink/pull/8352#issuecomment-489793723
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12417) Merge ReadableCatalog and ReadableWritableCatalog

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12417:
-
Description: 
As discussed with [~dawidwys], the original purpose to separate ReadableCatalog 
and ReadableWritableCatalog is to isolate access to metadata. However, we 
believe access control and authorization is orthogonal to design of catalog 
APIs and should be of a different effort.

Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
simplify the design.

cc [~twalthr] [~xuefuz]

  was:
As discussed with [~dawidwys], the original purpose to separate ReadableCatalog 
and ReadableWritableCatalog is to isolate access to metadata. However, we 
believe access control and authorization is orthogonal to catalog APIs and 
should be a different effort.

Thus, we propose to merge 


> Merge ReadableCatalog and ReadableWritableCatalog
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to design 
> of catalog APIs and should be of a different effort.
> Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
> simplify the design.
> cc [~twalthr] [~xuefuz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12417) Merge ReadableCatalog and ReadableWritableCatalog

2019-05-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12417:
-
Description: 
As discussed with [~dawidwys], the original purpose to separate ReadableCatalog 
and ReadableWritableCatalog is to isolate access to metadata. However, we 
believe access control and authorization is orthogonal to catalog APIs and 
should be a different effort.

Thus, we propose to merge 

> Merge ReadableCatalog and ReadableWritableCatalog
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to catalog 
> APIs and should be a different effort.
> Thus, we propose to merge 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run

2019-05-06 Thread Josh Bradt (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16834072#comment-16834072
 ] 

Josh Bradt commented on FLINK-12399:


[~walterddr]: Thanks, that workaround does solve my problem for now!

> FilterableTableSource does not use filters on job run
> -
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Assignee: Rong Rong
>Priority: Major
> Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource, 
> FilterableTableSource {
> private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
>LOG.info(" No filters defined ");
> } else {
> LOG.info(" Found filters ");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource applyPredicate(List predicates) {
> LOG.info("Applying predicates");
> List acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List allModels() {
> List models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource
>-  No filters defined {noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-06 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281277094
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+   private final Map 
schedulingExecutionVertices = new HashMap<>();
+
+   private final Map schedulingResultPartitions = new HashMap<>();
+
+   @Override
+   public Iterable getVertices() {
+   return 
Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+   }
+
+   @Override
+   public Optional getVertex(ExecutionVertexID 
executionVertexId)  {
+   return 
Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+   }
+
+   @Override
+   public Optional getResultPartition(
+   IntermediateResultPartitionID 
intermediateResultPartitionId) {
+   return 
Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+   }
+
+   public void addSchedulingExecutionVertex(SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   
schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), 
schedulingExecutionVertex);
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+   }
+
+   public void addSchedulingResultPartitions(final 
Collection resultPartitions) {
 
 Review comment:
   I think this should be private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-06 Thread GitBox
asfgit merged pull request #8329: [FLINK-12239][hive] Support table related 
operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface

2019-05-06 Thread GitBox
tillrohrmann commented on a change in pull request #8233: [FLINK-12227] 
[runtime] introduce SchedulingStrategy interface
URL: https://github.com/apache/flink/pull/8233#discussion_r281270711
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Representation of {@link IntermediateResultPartition}.
+ */
+public interface SchedulingResultPartition {
+
+   /**
+* Gets id of the result partition.
+*
+* @return id of the result partition
+*/
+   IntermediateResultPartitionID getId();
+
+   /**
+* Gets id of the intermediate result.
+*
+* @return id of the intermediate result
+*/
+   IntermediateDataSetID getResultId();
 
 Review comment:
   At some point, it would be good to make this id part of the 
`IntermediateResultPartitionID`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface

2019-05-06 Thread GitBox
tillrohrmann commented on a change in pull request #8233: [FLINK-12227] 
[runtime] introduce SchedulingStrategy interface
URL: https://github.com/apache/flink/pull/8233#discussion_r281270077
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
+   private final JobVertexID jobVertexId;
+
+   private final int subtaskIndex;
+
+   public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) {
+   this.jobVertexId = checkNotNull(jobVertexId);
+   this.subtaskIndex = subtaskIndex;
+   checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater 
than or equal to 0");
+   }
 
 Review comment:
   Getters for the compositional parts could be helpful for the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface

2019-05-06 Thread GitBox
tillrohrmann commented on a change in pull request #8233: [FLINK-12227] 
[runtime] introduce SchedulingStrategy interface
URL: https://github.com/apache/flink/pull/8233#discussion_r281271032
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
 ##
 @@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+/**
+ * Factory interface for {@link SchedulingStrategy}.
+ */
+public interface SchedulingStrategyFactory {
+
+   SchedulingStrategy getInstance(
 
 Review comment:
   maybe `createInstance`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >