[GitHub] [flink] wangyang0918 commented on issue #8624: [FLINK-10932]Initial flink-kubernetes module with empty implementation

2019-06-16 Thread GitBox
wangyang0918 commented on issue #8624: [FLINK-10932]Initial flink-kubernetes 
module with empty implementation
URL: https://github.com/apache/flink/pull/8624#issuecomment-502544267
 
 
   Hi, @tianchen92 
   I think you should remove the `dependencyManagement` and do as the following 
steps. 
   1. Add the `exclusions` of `jackson-core` `jackson-annotations` 
`jackson-databind` in the `kubernetes-client` dependency.
   2. Add the direct dependency with the specific version of the above 
exclusions.
   3. Set shaded plugin.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-06-16 Thread GitBox
gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] 
Set resource profiles for task slots
URL: https://github.com/apache/flink/pull/8704#discussion_r294143254
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -522,4 +534,68 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+* Test that RM and TM calculate same slot resource profile.
 
 Review comment:
   Tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] Set resource profiles for task slots

2019-06-16 Thread GitBox
gaoyunhaii commented on a change in pull request #8704: [FLINK-12812][runtime] 
Set resource profiles for task slots
URL: https://github.com/apache/flink/pull/8704#discussion_r294143299
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
 ##
 @@ -163,4 +163,11 @@
boolean updatePartitionInfo(
ExecutionAttemptID consumerID,
PartitionInfo partitionInfo) throws IOException, 
InterruptedException;
+
+   /**
+* Get total memory size for network buffers in bytes.
 
 Review comment:
   Gets


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12867) Add insert overwrite grammar as HIVE dialect

2019-06-16 Thread Danny Chan (JIRA)
Danny Chan created FLINK-12867:
--

 Summary: Add insert overwrite grammar as HIVE dialect
 Key: FLINK-12867
 URL: https://issues.apache.org/jira/browse/FLINK-12867
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Danny Chan
Assignee: Danny Chan
 Fix For: 1.9.0


Support grammar like:

 
{code:java}
insert overwrite tbl1 partition(a=1) select a from tbl2;{code}
This overwrite can use whole table or single partition as effective scope.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8738: [FLINK-12845][sql-client] Execute multiple statements in command line…

2019-06-16 Thread GitBox
wuchong commented on issue #8738: [FLINK-12845][sql-client] Execute multiple 
statements in command line…
URL: https://github.com/apache/flink/pull/8738#issuecomment-502543453
 
 
   Hi @docete , could you add some test to verify the new feature? We should 
always add some tests when introduce new features. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865333#comment-16865333
 ] 

Yun Gao commented on FLINK-12863:
-

Hi all, I create a Jira for the state inconsistency between RM and TM: 
[FLINK-12865 | https://issues.apache.org/jira/browse/FLINK-12865]. I agree with 
that this two problems shares similarity. For the inconsistency between RM and 
TM, previously we solve it with the version method, but currently I has not 
found the cases when FenceToken is not available. I think more thoughts would 
be required to compare the two methods.

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12865) State inconsistency between RM and TM on the slot status

2019-06-16 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12865:

Description: 
There may be state inconsistency between TM and RM due to race condition and 
message loss:
 # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
sends the heartbeat in another thread. There may be cases that the slot on TM 
is FREE initially and SlotReport read the FREE state, then RM requests slot and 
mark the slot as allocated, and the SlotReport finally override the allocated 
status at the RM side wrongly.
 # When RM requests slot, TM received the requests but the acknowledge message 
get lot. Then RM will think this slot is free. 

 Both the problems may cause RM marks an ALLOCATED slot as FREE. This may 
currently cause additional retries till the state is synchronized after the 
next heartbeat, and for the inaccurate resource statistics for the fine-grained 
resource management in the future.

  was:
There may be state inconsistency between TM and RM due to race condition and 
message loss:
 # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
sends the heartbeat in another thread. There may be cases that the slot on TM 
is FREE initially and SlotReport read the FREE state, then RM requests slot and 
mark the slot as allocated, and the SlotReport finally override the allocated 
status at the RM side wrongly.
 # When RM requests slot, TM received the requests but the acknowledge message 
get lot. Then RM will think this slot is free and assigned it to other request. 

 


> State inconsistency between RM and TM on the slot status
> 
>
> Key: FLINK-12865
> URL: https://issues.apache.org/jira/browse/FLINK-12865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> There may be state inconsistency between TM and RM due to race condition and 
> message loss:
>  # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
> sends the heartbeat in another thread. There may be cases that the slot on TM 
> is FREE initially and SlotReport read the FREE state, then RM requests slot 
> and mark the slot as allocated, and the SlotReport finally override the 
> allocated status at the RM side wrongly.
>  # When RM requests slot, TM received the requests but the acknowledge 
> message get lot. Then RM will think this slot is free. 
>  Both the problems may cause RM marks an ALLOCATED slot as FREE. This may 
> currently cause additional retries till the state is synchronized after the 
> next heartbeat, and for the inaccurate resource statistics for the 
> fine-grained resource management in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12866) Connectors module failed on Travis checking

2019-06-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865328#comment-16865328
 ] 

TisonKun commented on FLINK-12866:
--

>From its corresponding compile 
>stage([https://travis-ci.org/apache/flink/jobs/546546643]) I can see the 
>cached dir created

{\{Creating cache build directory /home/travis/flink_cache/37821/flink}}

and the clean stage wasn't executed. Could any other command accidentally 
deleted the dir?

cc [~Zentol]

> Connectors module failed on Travis checking
> ---
>
> Key: FLINK-12866
> URL: https://issues.apache.org/jira/browse/FLINK-12866
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.9.0
>Reporter: Biao Liu
>Priority: Minor
>
> Here is the failure information.
> Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
> build.
> The command "./tools/travis_controller.sh connectors" exited with 1.
> Full log is here, https://travis-ci.org/apache/flink/jobs/546546647



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027
 
 
   Python module is fine but connectors module failed this time on Travis 
checking. It seems a travis relevant issue. I have filed a JIRA issue for this, 
https://issues.apache.org/jira/browse/FLINK-12866
   Will trigger checking again through closing and reopening, hope it works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027
 
 
   Python module is fine but connectors module failed this time on Travis 
checking. It seems a travis relevant issue. I have filed a JIRA issue for this, 
https://issues.apache.org/jira/browse/FLINK-12866
   Will trigger checking again through closinng and reopening, hope it works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721#issuecomment-502540027
 
 
   Python module is fine but connectors module failed this time on Travis 
checking. It seems a travis relevant issue. I have filed a JIRA issue for this, 
https://issues.apache.org/jira/browse/FLINK-12866
   Will trigger checking again through close and reopening, hope it works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy opened a new pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy opened a new pull request #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721
 
 
   …geMode property
   
   ## What is the purpose of the change
   
   * Since `StreamTransformation` would also support batch runner in 1.9, the 
result partition type of `StreamTransformation` should not be hard-coded with 
`PIPELINED_BOUNDED`
   * We need to provide a way for upper level API upon `StreamTransformation` 
to configure the result partition type of edge
   
   ## Brief change log
   
   * Expose a property `DataExchangeMode` of `PartitionTransformation` as an 
internal API of `StreamTransformation`
   * Pass the `DataExchangeMode` to `StreamEdge`
   * `StreamingJobGraphGenerator` chooses the appropriate result partition type 
based on `DataExchangeMode` of `StreamEdge`
   
   ## Verifying this change
   
   * Add an unit test of `StreamingJobGraphGeneratorTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy closed pull request #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy closed pull request #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294139723
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   1. Yes, maybe we need something like `setTableStats`, but this is out of the 
scope of this issue.
   2. If the TableSource is changed, shouldn't we always to create a new 
`TableSourceTable` and `getTableStats()` again? How do we know the stats is not 
changed? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12866) Connectors module failed on Travis checking

2019-06-16 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12866:


 Summary: Connectors module failed on Travis checking
 Key: FLINK-12866
 URL: https://issues.apache.org/jira/browse/FLINK-12866
 Project: Flink
  Issue Type: Test
  Components: Tests
Affects Versions: 1.9.0
Reporter: Biao Liu


Here is the failure information.
Cached flink dir /home/travis/flink_cache/37821/flink does not exist. Exiting 
build.
The command "./tools/travis_controller.sh connectors" exited with 1.

Full log is here, https://travis-ci.org/apache/flink/jobs/546546647



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString

2019-06-16 Thread GitBox
JingsongLi commented on a change in pull request #8689: 
[FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
URL: https://github.com/apache/flink/pull/8689#discussion_r294132999
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 ##
 @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) {
return fromBytes(spaces);
}
 
-   /**
-* Returns the number of bytes for a code point with the first byte as 
`b`.
-* @param b The first byte of a code point
-*/
-   private static int numBytesForFirstByte(final byte b) {
-   if (b >= 0) {
-   // 1 byte, 7 bits: 0xxx
-   return 1;
-   } else if ((b >> 5) == -2 && (b & 0x1e) != 0) {
-   // 2 bytes, 11 bits: 110x 10xx
-   return 2;
-   } else if ((b >> 4) == -2) {
-   // 3 bytes, 16 bits: 1110 10xx 10xx
-   return 3;
-   } else if ((b >> 3) == -2) {
-   // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx
-   return 4;
-   } else {
-   // throw new IllegalArgumentException();
-   // Skip the first byte disallowed in UTF-8
-   return 1;
-   }
-   }
+   // 
--
+   // Utility open methods on BinaryString
 
 Review comment:
   Yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12865) State inconsistency between RM and TM on the slot status

2019-06-16 Thread Yun Gao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865296#comment-16865296
 ] 

Yun Gao commented on FLINK-12865:
-

I think adding a version to the slot may solves this problem.
 # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but 
not knowing the result of the request. A slot with SYNCING status cannot be 
allocated to others.
 # RM and TM maintains a version for each slot, and the version starts from 0.
 # Whenever RM requests slot, it add the version by 1, and send the requests to 
TM. TM will only do allocation when RM's version > TM's version. 
 # TM will also attach the version in the HeartBeat and RM will only accept the 
slot status when the TM's version >= RM's version.
 # If the SYNCING status keeps too long time, the request will be resent.

The version method is a simplified solution of the full vector clock based 
state management. In the full vector-clock design, the version should be a 
vector represents (RM's version, TM's version). Whenever RM modify the slot's 
status (requestSlot) and TM modify the slot's status (freeSlot), It need to 
first increase the corresponding component and send the sync messages, and the 
messages can only be accepted when the vector version >= messages' vector 
version.

However, since for the status of slot TM will only modify its status when 
freeing slots, we can ignore the component of TM's side will only cause a freed 
slot be marked as allocated, this will not cause error, and the free status can 
be finally updated to RM with Heartbeat message.

> State inconsistency between RM and TM on the slot status
> 
>
> Key: FLINK-12865
> URL: https://issues.apache.org/jira/browse/FLINK-12865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> There may be state inconsistency between TM and RM due to race condition and 
> message loss:
>  # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
> sends the heartbeat in another thread. There may be cases that the slot on TM 
> is FREE initially and SlotReport read the FREE state, then RM requests slot 
> and mark the slot as allocated, and the SlotReport finally override the 
> allocated status at the RM side wrongly.
>  # When RM requests slot, TM received the requests but the acknowledge 
> message get lot. Then RM will think this slot is free and assigned it to 
> other request. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12865) State inconsistency between RM and TM on the slot status

2019-06-16 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12865:

Description: 
There may be state inconsistency between TM and RM due to race condition and 
message loss:
 # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
sends the heartbeat in another thread. There may be cases that the slot on TM 
is FREE initially and SlotReport read the FREE state, then RM requests slot and 
mark the slot as allocated, and the SlotReport finally override the allocated 
status at the RM side wrongly.
 # When RM requests slot, TM received the requests but the acknowledge message 
get lot. Then RM will think this slot is free and assigned it to other request. 

 

  was:
There may be state inconsistency between TM and RM due to race condition and 
message loss:
 # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
sends the heartbeat in another thread. There may be cases that the slot on TM 
is FREE initially and SlotReport read the FREE state, then RM requests slot and 
mark the slot as allocated, and the SlotReport finally override the allocated 
status at the RM side wrongly.
 # When RM requests slot, TM received the requests but the acknowledge message 
get lot. Then RM will think this slot is free and assigned it to other request.

Add a version to the slot may solves this problem.
 # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but 
not knowing the result of the request. A slot with SYNCING status cannot be 
allocated to others.
 # RM and TM maintains a version for each slot, and the version starts from 0.
 # Whenever RM requests slot, it add the version by 1, and send the requests to 
TM. TM will only do allocation when RM's version > TM's version. 
 # TM will also attach the version in the HeartBeat and RM will only accept the 
slot status when the TM's version >= RM's version.

The version method is a simplified solution of the full vector clock based 
state management. In the full vector-clock design, the version should be a 
vector represents (RM's version, TM's version). Whenever RM modify the slot's 
status (requestSlot) and TM modify the slot's status (freeSlot), It need to 
first increase the corresponding component and send the sync messages, and the 
messages can only be accepted when the vector version >= messages' vector 
version.

However, since for the status of slot TM will only modify its status when 
freeing slots, we can ignore the component of TM's side will only cause a freed 
slot be marked as allocated, this will not cause error, and the free status can 
be finally updated to RM with Heartbeat message.

 

 


> State inconsistency between RM and TM on the slot status
> 
>
> Key: FLINK-12865
> URL: https://issues.apache.org/jira/browse/FLINK-12865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> There may be state inconsistency between TM and RM due to race condition and 
> message loss:
>  # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
> sends the heartbeat in another thread. There may be cases that the slot on TM 
> is FREE initially and SlotReport read the FREE state, then RM requests slot 
> and mark the slot as allocated, and the SlotReport finally override the 
> allocated status at the RM side wrongly.
>  # When RM requests slot, TM received the requests but the acknowledge 
> message get lot. Then RM will think this slot is free and assigned it to 
> other request. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-16 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293741881
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
}
}
 
+   /**
+* Starting from the second operator, go forward through the operator 
chain to notify
+* each operator that its input has ended.
+*
+* @throws Exception if some exception happens in the endInput function 
of an operator.
+*/
+   public void endOperatorInputs() throws Exception {
+   for (int i = allOperators.length - 2; i >= 0; i--) {
 
 Review comment:
   > I think you could simplify the code if you iterated through all operators 
in OperatorChain (not starting from the second one), check each one of them if 
there are instanceof BoundedOneInput and end them if they are.
   >
   >This would work for:
   >
   
   > - OneInputStreamTask - because all of them are/can be BoundedOneInput
   > - SourceStreamTask - because head will not be BoundedOneInput
   > - TwoInputStreamTask - because head will not be BoundedOneInput
   
   It works for now. However, if chaining non-header two input operator is 
allowed in the future, it will have problems. Beside, this simplification is 
somewhat difficult to understand. I suggest not to simplify it like this. What 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294131403
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   1. `TableSource` should be decoupled with `Catalog`, or add a method like 
`setTableStats` for `TableSource` interface.
   2. The TableSource may be changed,  e.g. project push down into table source


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-16 Thread GitBox
KurtYoung commented on issue #8626: [FLINK-12742] Add insert into partition 
grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#issuecomment-502527618
 
 
   merged with 4320d8372f93e8fa4d82da06e5e0a0ba310195a2


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-16 Thread GitBox
KurtYoung closed pull request #8626: [FLINK-12742] Add insert into partition 
grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12742) Add insert into partition grammar as hive dialect

2019-06-16 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12742.
--
Resolution: Implemented

merged in 1.9.0: 4320d8372f93e8fa4d82da06e5e0a0ba310195a2

> Add insert into partition grammar as hive dialect
> -
>
> Key: FLINK-12742
> URL: https://issues.apache.org/jira/browse/FLINK-12742
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294129377
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
-  - The **TaskManagers** (also called *workers*) execute the *tasks* (or more 
specifically, the subtasks) of a dataflow,
-and buffer and exchange the data *streams*.
+  - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks 
),并且缓存和交换数据 *streams*。
 
-There must always be at least one TaskManager.
+至少需要一个 TaskManager。
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
-TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
+JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。
 
-The **client** is not part of the runtime and program execution, but is used 
to prepare and send a dataflow to the JobManager.
-After that, the client can disconnect, or stay connected to receive progress 
reports. The client runs either as part of the
-Java/Scala program that triggers the execution, or in the command line process 
`./bin/flink run ...`.
+**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 
的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 
程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。
 
 
 
 {% top %}
 
-## Task Slots and Resources
+## Task Slots 和资源
 
-Each worker (TaskManager) is a *JVM process*, and may execute one or more 
subtasks in separate threads.
-To control how many tasks a worker accepts, a worker has so called **task 
slots** (at least one).
+每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 
task 的数量,worker 拥有所谓的  **task slots** (至少一个)。
 
-Each *task slot* represents a fixed subset of resources of the TaskManager. A 
TaskManager with three slots, for example,
-will dedicate 1/3 of its managed memory to each slot. Slotting the resources 
means that a subtask will not
-compete with subtasks from other jobs for managed memory, but instead has a 
certain amount of reserved
-managed memory. Note that no CPU isolation happens here; currently slots only 
separate the managed memory of tasks.
+每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 
会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 
隔离,当前 slots 之间只是划分任务的内存资源。
 
 Review comment:
   ```suggestion
   每个 *task slots* 代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 
会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 
隔离,当前 slots 之间只是划分任务的内存资源。
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294121063
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 Review comment:
   Do we need to change the url of image?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294120960
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
 Review comment:
   Maybe `把算子链接成 tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量` can change 
into `把算子链接成 tasks 能够减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量`
   Do we need to update 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294122301
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
-  - The **TaskManagers** (also called *workers*) execute the *tasks* (or more 
specifically, the subtasks) of a dataflow,
-and buffer and exchange the data *streams*.
+  - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks 
),并且缓存和交换数据 *streams*。
 
-There must always be at least one TaskManager.
+至少需要一个 TaskManager。
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
-TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
+JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。
 
 Review comment:
   ```suggestion
   JobManagers 和 TaskManagers 有多种启动方式:直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,接手分配的工作。
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-16 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r294130413
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ##
 @@ -174,6 +179,18 @@ private void initializeNumRecordsIn() {
}
}
 
+   private boolean checkFinished() throws Exception {
+   boolean isFinished = input.isFinished();
+   if (isFinished) {
+   if (streamOperator instanceof BoundedOneInput) {
 
 Review comment:
   If let `xInputProcessor` see non-head operators, we can use the following 
structure. (`OperatorChain` is still invisible to `XInputProcessor`)
   
   ```
   == xInputProcessor.java ==
   
   private boolean checkFinished() throws Exception {
   ..
   // invoke endInput(...) of the head operator
   ..
  // invoke endInput(...) of other operators
  streamTask.endNonHeadOperatorInputs()
   }
   
   == xStreamTask.java ==
   
   public void endNonHeadOperatorInputs throws Exception {
   operatorChain.endNonHeadOperatorInputs();
   }
   
   == OperatorChain.java ==
   
   public void endNonHeadOperatorInputs throws Exception {
  ..
   }
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294121434
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
 Review comment:
   ```suggestion
   每个 Job 至少会有一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 
*standby* 状态。
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294129009
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
-  - The **TaskManagers** (also called *workers*) execute the *tasks* (or more 
specifically, the subtasks) of a dataflow,
-and buffer and exchange the data *streams*.
+  - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks 
),并且缓存和交换数据 *streams*。
 
-There must always be at least one TaskManager.
+至少需要一个 TaskManager。
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
-TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
+JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。
 
-The **client** is not part of the runtime and program execution, but is used 
to prepare and send a dataflow to the JobManager.
-After that, the client can disconnect, or stay connected to receive progress 
reports. The client runs either as part of the
-Java/Scala program that triggers the execution, or in the command line process 
`./bin/flink run ...`.
+**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 
的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 
程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。
 
 Review comment:
   `但它是被用来准备和发送 dataflow 到 JobManager 的` --> `但用它来准备和提交 dataflow 到 JobManager 
`? 
   `客户端可以断开连接,也可以保持连接来接收进度报告` --> `提交完成之后,客户端可以断开连接,也可以保持连接来接收进度报告`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294129829
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
-  - The **TaskManagers** (also called *workers*) execute the *tasks* (or more 
specifically, the subtasks) of a dataflow,
-and buffer and exchange the data *streams*.
+  - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks 
),并且缓存和交换数据 *streams*。
 
-There must always be at least one TaskManager.
+至少需要一个 TaskManager。
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
-TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
+JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。
 
-The **client** is not part of the runtime and program execution, but is used 
to prepare and send a dataflow to the JobManager.
-After that, the client can disconnect, or stay connected to receive progress 
reports. The client runs either as part of the
-Java/Scala program that triggers the execution, or in the command line process 
`./bin/flink run ...`.
+**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 
的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 
程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。
 
 
 
 {% top %}
 
-## Task Slots and Resources
+## Task Slots 和资源
 
-Each worker (TaskManager) is a *JVM process*, and may execute one or more 
subtasks in separate threads.
-To control how many tasks a worker accepts, a worker has so called **task 
slots** (at least one).
+每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 
task 的数量,worker 拥有所谓的  **task slots** (至少一个)。
 
-Each *task slot* represents a fixed subset of resources of the TaskManager. A 
TaskManager with three slots, for example,
-will dedicate 1/3 of its managed memory to each slot. Slotting the resources 
means that a subtask will not
-compete with subtasks from other jobs for managed memory, but instead has a 
certain amount of reserved
-managed memory. Note that no CPU isolation happens here; currently slots only 
separate the managed memory of tasks.
+每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 
会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 
隔离,当前 slots 之间只是划分任务的内存资源。
 
-By adjusting the number of task slots, users can define how subtasks are 
isolated from each other.
-Having one slot per TaskManager means each task group runs in a separate JVM 
(which can be started in a
-separate container, for example). Having multiple slots
-means more subtasks share the same JVM. Tasks in the same JVM share TCP 
connections (via multiplexing) and
-heartbeat messages. They may also share data sets and data structures, thus 
reducing 

[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294129669
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
-The Flink runtime consists of two types of processes:
+Flink 运行时包含两类进程:
 
-  - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
-checkpoints, coordinate recovery on failures, etc.
+  - **JobManagers** (也称为 *masters*)协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。
 
-There is always at least one Job Manager. A high-availability setup will 
have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+至少需要一个 JobManager。高可用部署下会有多个 JobManagers,其中一个作为 *leader*,其余处于 *standby* 状态。
 
-  - The **TaskManagers** (also called *workers*) execute the *tasks* (or more 
specifically, the subtasks) of a dataflow,
-and buffer and exchange the data *streams*.
+  - **TaskManagers**(也称为 *workers*)执行 dataflow 中的 *tasks*(准确来说是 subtasks 
),并且缓存和交换数据 *streams*。
 
-There must always be at least one TaskManager.
+至少需要一个 TaskManager。
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
-TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
+JobManagers 和 TaskManagers 可由多种方式启动:可以直接在机器上启动(该集群称为 [standalone 
cluster](../ops/deployment/cluster_setup.html)),也可以在容器或资源管理框架,如 
[YARN](../ops/deployment/yarn_setup.html) 或 
[Mesos](../ops/deployment/mesos.html),中启动。TaskManagers 连接到 
JobManagers,通知后者自己可用,之后 TaskManagers 便可以被分配工作了。
 
-The **client** is not part of the runtime and program execution, but is used 
to prepare and send a dataflow to the JobManager.
-After that, the client can disconnect, or stay connected to receive progress 
reports. The client runs either as part of the
-Java/Scala program that triggers the execution, or in the command line process 
`./bin/flink run ...`.
+**客户端**不是运行时(runtime)和作业执行时的一部分,但它是被用来准备和发送 dataflow 到 JobManager 
的。在那之后,客户端可以断开连接,也可以保持连接来接收进度报告。客户端既可以作为触发执行的 Java / Scala 
程序的一部分,也可以在命令行进程中运行`./bin/flink run ...`。
 
 
 
 {% top %}
 
-## Task Slots and Resources
+## Task Slots 和资源
 
-Each worker (TaskManager) is a *JVM process*, and may execute one or more 
subtasks in separate threads.
-To control how many tasks a worker accepts, a worker has so called **task 
slots** (at least one).
+每个 worker(TaskManager)都是一个 *JVM 进程*,并且可以在不同的线程中执行一个或多个 subtasks。为了控制 worker 接收 
task 的数量,worker 拥有所谓的  **task slots** (至少一个)。
 
-Each *task slot* represents a fixed subset of resources of the TaskManager. A 
TaskManager with three slots, for example,
-will dedicate 1/3 of its managed memory to each slot. Slotting the resources 
means that a subtask will not
-compete with subtasks from other jobs for managed memory, but instead has a 
certain amount of reserved
-managed memory. Note that no CPU isolation happens here; currently slots only 
separate the managed memory of tasks.
+每个 *task slots*代表 TaskManager 的一份固定资源子集。例如,具有三个 slots 的 TaskManager 
会将其管理的内存资源分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但是也意味着它们只拥有固定的资源。注意这里并没有 CPU 
隔离,当前 slots 之间只是划分任务的内存资源。
 
-By adjusting the number of task slots, users can define how subtasks are 
isolated from each other.
-Having one slot per TaskManager means each task group runs in a separate JVM 
(which can be started in a
-separate container, for example). Having multiple slots
-means more subtasks share the same JVM. Tasks in the same JVM share TCP 
connections (via multiplexing) and
-heartbeat messages. They may also share data sets and data structures, thus 
reducing 

[GitHub] [flink] klion26 commented on a change in pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese

2019-06-16 Thread GitBox
klion26 commented on a change in pull request #8750: [FLINK-11606] Translate 
the "Distributed Runtime Environment" page in to Chinese
URL: https://github.com/apache/flink/pull/8750#discussion_r294121310
 
 

 ##
 File path: docs/concepts/runtime.zh.md
 ##
 @@ -26,102 +26,74 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Tasks and Operator Chains
+## 任务和算子链
 
-For distributed execution, Flink *chains* operator subtasks together into 
*tasks*. Each task is executed by one thread.
-Chaining operators together into tasks is a useful optimization: it reduces 
the overhead of thread-to-thread
-handover and buffering, and increases overall throughput while decreasing 
latency.
-The chaining behavior can be configured; see the [chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups) for details.
+分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task。每个 task 由一个线程执行。把算子链接成 
tasks 是一个很好的优化:它减少了线程间切换和缓冲的开销,并在降低延迟的同时提高了整体吞吐量。链接操作的配置详情可参考:[chaining 
docs](../dev/stream/operators/#task-chaining-and-resource-groups)
 
-The sample dataflow in the figure below is executed with five subtasks, and 
hence with five parallel threads.
+下图的 dataflow 由五个 subtasks 执行,因此具有五个并行线程。
 
 
 
 {% top %}
 
-## Job Managers, Task Managers, Clients
+## Job Managers、Task Managers、客户端(Clients)
 
 Review comment:
   Do we need to translate `Job Managers` and `Task Managers` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8707: [FLINK-12815] 
[table-planner-blink] Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#discussion_r294130078
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.DataStreamQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.plan.schema.TableSourceTable;
+import org.apache.flink.table.plan.stats.FlinkStatistic;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static java.lang.String.format;
+
+/**
+ * A mapping between Flink catalog's database and Calcite's schema.
+ * Tables are registered as tables in the schema.
+ */
+class DatabaseCalciteSchema implements Schema {
+   private final String databaseName;
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public DatabaseCalciteSchema(String databaseName, String catalogName, 
Catalog catalog) {
+   this.databaseName = databaseName;
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   @Override
+   public Table getTable(String tableName) {
+   ObjectPath tablePath = new ObjectPath(databaseName, tableName);
+
+   try {
+   if (!catalog.tableExists(tablePath)) {
+   return null;
+   }
+
+   CatalogBaseTable table = catalog.getTable(tablePath);
+
+   // TODO supports GenericCatalogView
+   if (table instanceof QueryOperationCatalogView) {
+   QueryOperationCatalogView view = 
(QueryOperationCatalogView) table;
+   QueryOperation operation = 
view.getQueryOperation();
+   if (operation instanceof 
DataStreamQueryOperation) {
+   List qualifiedName = 
Arrays.asList(catalogName, databaseName, tableName);
+   ((DataStreamQueryOperation) 
operation).setQualifiedName(qualifiedName);
+   }
+   return 
QueryOperationCatalogViewTable.createCalciteTable(view);
+   } else if (table instanceof ConnectorCatalogTable) {
+   ConnectorCatalogTable connectorTable = 
(ConnectorCatalogTable) table;
+   return connectorTable.getTableSource()
+   .map(tableSource -> new 
TableSourceTable<>(
+   tableSource,
+   !connectorTable.isBatch(),
+   FlinkStatistic.UNKNOWN())
+   ).orElseThrow(() -> new 
TableException("Cannot query a sink only table."));
+   } else if (table instanceof FlinkTempCatalogTable) {
+   return ((FlinkTempCatalogTable) 
table).getAbstractTable();
+   } else {
+   throw new TableException("Unsupported table 
type: " + table);
+   }
+ 

[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-16 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r294128294
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
}
}
 
+   /**
+* Starting from the second operator, go forward through the operator 
chain to notify
+* each operator that its input has ended.
+*
+* @throws Exception if some exception happens in the endInput function 
of an operator.
+*/
+   public void endOperatorInputs() throws Exception {
+   for (int i = allOperators.length - 2; i >= 0; i--) {
 
 Review comment:
   Maybe we can use the following structure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12406:
---
Labels: pull-request-available  (was: )

> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-16 Thread GitBox
flinkbot commented on issue #8756: [FLINK-12406] [Runtime] Report 
BLOCKING_PERSISTENT result partition meta back to client
URL: https://github.com/apache/flink/pull/8756#issuecomment-502525621
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xpray opened a new pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-16 Thread GitBox
Xpray opened a new pull request #8756: [FLINK-12406] [Runtime] Report 
BLOCKING_PERSISTENT result partition meta back to client
URL: https://github.com/apache/flink/pull/8756
 
 
   ## What is the purpose of the change
   This PR follows FLINK-12405, it reports all result partition locations of 
`BLOCKING_PERSISTENT` back to `ExecutionEnvironment`, so `TableEnvironment` can 
access these data in later PR[FLINK-12420]
   
   
   ## Brief change log
 - *Add a new class `IntermediateResultDescriptor`, which stores location 
of a `BLOCKING_PERSISTENT` `ResultPartition`*
 - *Add a new method `getResultPartitionDescriptors()` in 
`AccessExecutionGraph`*
 - *Add a new filed in `JobExecutionResult`, `JobResult`, 
`ArchivedExecutionGraph` and `ExecutionEnvironment`, which keeps a mapping from 
`IntermediateDataSetID` to its `ResultPartition` locations*
 - *When a job finishes, the metadata will flow in this path: 
`ExecutionGraph` -> `ArchivedExecutionGraph` -> `JobExecutionResult` -> 
`JobResult` -> `ExecutionEnvironment`*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
org.apache.flink.test.operators.ExecutionEnvironmentITCase#testAccessingBlockingPersistentResultPartition
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes

[FLIP-36](https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink)
[Implementations of 
FLIP-36](https://docs.google.com/document/d/1qY45m3_r2NmXujcWcjJMJ_avxvnXjGy9tDn99qIBM1Y/edit#)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12865) State inconsistency between RM and TM on the slot status

2019-06-16 Thread Yun Gao (JIRA)
Yun Gao created FLINK-12865:
---

 Summary: State inconsistency between RM and TM on the slot status
 Key: FLINK-12865
 URL: https://issues.apache.org/jira/browse/FLINK-12865
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Yun Gao
Assignee: Yun Gao


There may be state inconsistency between TM and RM due to race condition and 
message loss:
 # When TM sends heartbeat, it retrieve SlotReport in the main thread, but 
sends the heartbeat in another thread. There may be cases that the slot on TM 
is FREE initially and SlotReport read the FREE state, then RM requests slot and 
mark the slot as allocated, and the SlotReport finally override the allocated 
status at the RM side wrongly.
 # When RM requests slot, TM received the requests but the acknowledge message 
get lot. Then RM will think this slot is free and assigned it to other request.

Add a version to the slot may solves this problem.
 # Add a SYNCING status in the RM side. SYNCING means request is sent to TM but 
not knowing the result of the request. A slot with SYNCING status cannot be 
allocated to others.
 # RM and TM maintains a version for each slot, and the version starts from 0.
 # Whenever RM requests slot, it add the version by 1, and send the requests to 
TM. TM will only do allocation when RM's version > TM's version. 
 # TM will also attach the version in the HeartBeat and RM will only accept the 
slot status when the TM's version >= RM's version.

The version method is a simplified solution of the full vector clock based 
state management. In the full vector-clock design, the version should be a 
vector represents (RM's version, TM's version). Whenever RM modify the slot's 
status (requestSlot) and TM modify the slot's status (freeSlot), It need to 
first increase the corresponding component and send the sync messages, and the 
messages can only be accepted when the vector version >= messages' vector 
version.

However, since for the status of slot TM will only modify its status when 
freeing slots, we can ignore the component of TM's side will only cause a freed 
slot be marked as allocated, this will not cause error, and the free status can 
be finally updated to RM with Heartbeat message.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-16 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r294128294
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
}
}
 
+   /**
+* Starting from the second operator, go forward through the operator 
chain to notify
+* each operator that its input has ended.
+*
+* @throws Exception if some exception happens in the endInput function 
of an operator.
+*/
+   public void endOperatorInputs() throws Exception {
+   for (int i = allOperators.length - 2; i >= 0; i--) {
 
 Review comment:
   Maybe we can use the following structure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8707: [FLINK-12815] 
[table-planner-blink] Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#discussion_r294127605
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Bridge between the {@link CatalogManager} and the {@link Schema}. This way 
we can query Flink's specific catalogs
+ * from Calcite.
+ *
+ * The mapping for {@link Catalog}s is modeled as a strict two-level 
reference structure for Flink in Calcite,
+ * the full path of objects is of format 
[catalog_name].[db_name].[meta-object_name].
+ *
+ * It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 
to the Calcite's schema.
+ */
+@Internal
+public class CatalogManagerCalciteSchema implements Schema {
+
+   private final CatalogManager catalogManager;
+   private boolean isBatch;
+
 
 Review comment:
   we unify `BatchTableSourceTable` and `StreamTableSourceTable` into 
`TableSourceTable`,  and batch `TableSourceTable` and stream `TableSourceTable` 
have different behaviors in infering table row type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #8686: [FLINK-11869] Make buffer size in checkpoint stream factory configurable

2019-06-16 Thread GitBox
Myasuka commented on issue #8686: [FLINK-11869] Make buffer size in checkpoint 
stream factory configurable
URL: https://github.com/apache/flink/pull/8686#issuecomment-502523369
 
 
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8707: [FLINK-12815] 
[table-planner-blink] Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#discussion_r294126984
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access objects(tables, views,
+ * functions, types) in SQL queries without registering them in advance. 
Databases are registered as sub-schemas
+ * in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given database name, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
 
 Review comment:
   do you mean we need an interface which defines methods like: 
`getDatabase(String database)` and `getTable(String table)` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8707: [FLINK-12815] 
[table-planner-blink] Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#discussion_r294126984
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access objects(tables, views,
+ * functions, types) in SQL queries without registering them in advance. 
Databases are registered as sub-schemas
+ * in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given database name, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
 
 Review comment:
   do you mean we need an interface which defines methods like: 
`getDatabase(String database)` or `getTable(String table)` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294126584
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 ##
 @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) {
 * @param types field types
 * @param fields field names
 * @param statistic statistic of current table
+* @param uniqueKeys unique keys of current table
 * @return returns the registered [[Table]].
 */
   def addTableSource(
   name: String,
   types: Array[TypeInformation[_]],
   fields: Array[String],
-  statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table
+  statistic: TableStats,
 
 Review comment:
   If the more information we need add to `FlinkStatistic`, I think it should 
also be included in `TableStats`. Regarding to the `relModifiedMonotonicity`, 
it is only be used internally in intermediate table source 
(`IntermediateRelTable`) which keeps `FlinkStatistic` as the constructor 
parameter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294126242
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   1. the statistic from catalog should be restored via 
`TableSource#getTableStats()`.
   2. I think if the TableSource is not changed, then we don't need to 
re-construct a new `TableSourceTable`. I can reuse the original 
`TableSourceTable` and avoid calling `TableSource#getTableStats` again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12857) move FilterableTableSource into flink-table-common

2019-06-16 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12857.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merged in 1.9.0: 9f8f89923e96286275ba3fb259ec450baaa4f7c0

> move FilterableTableSource into flink-table-common
> --
>
> Key: FLINK-12857
> URL: https://issues.apache.org/jira/browse/FLINK-12857
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> move FilterableTableSource into flink-table-common, so that flink-planner and 
> blink-planner could use this interface both.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278
 ] 

shuai.xu edited comment on FLINK-12863 at 6/17/19 3:27 AM:
---

Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport may be have some defects. 
Considering how would you update the fencing token? When offering slots 
succeeds or before offering slots? If when offering slots succeeds, it may 
happen that JM use the new fencing token while TM considering the offering 
slots fail, so TM may not update the token, and JM have no change to use the 
old token any more. If TM updates the token before offering slots, it may 
happen that JM doesn't receive the offering, so JM doesn't update the token. I 
think using a version may be more suitable, as we can compare two version, the 
bigger version will be correct always.


was (Author: tiemsn):
Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport can solve it. But how would you 
update the fencing token? When offering slots succeeds or before offering 
slots? If when offering slots succeeds, it may happen that JM use the new 
fencing token while TM considering the offering slots fail, so TM may not 
update the token, and JM have no change to use the old token any more. If TM 
updates the token before offering slots, it may happen that JM doesn't receive 
the offering, so JM doesn't update the token. I think using a version may be 
more suitable, as we can compare two version, the bigger version will be 
correct always.

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung merged pull request #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common

2019-06-16 Thread GitBox
KurtYoung merged pull request #8748: [FLINK-12857] [table] move 
FilterableTableSource into flink-table-common
URL: https://github.com/apache/flink/pull/8748
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString

2019-06-16 Thread GitBox
KurtYoung commented on a change in pull request #8689: 
[FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
URL: https://github.com/apache/flink/pull/8689#discussion_r294125722
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 ##
 @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) {
return fromBytes(spaces);
}
 
-   /**
-* Returns the number of bytes for a code point with the first byte as 
`b`.
-* @param b The first byte of a code point
-*/
-   private static int numBytesForFirstByte(final byte b) {
-   if (b >= 0) {
-   // 1 byte, 7 bits: 0xxx
-   return 1;
-   } else if ((b >> 5) == -2 && (b & 0x1e) != 0) {
-   // 2 bytes, 11 bits: 110x 10xx
-   return 2;
-   } else if ((b >> 4) == -2) {
-   // 3 bytes, 16 bits: 1110 10xx 10xx
-   return 3;
-   } else if ((b >> 3) == -2) {
-   // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx
-   return 4;
-   } else {
-   // throw new IllegalArgumentException();
-   // Skip the first byte disallowed in UTF-8
-   return 1;
-   }
-   }
+   // 
--
+   // Utility open methods on BinaryString
 
 Review comment:
   what's `Open Interfaces`? Do you try to mean `Public interfaces`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294124528
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   Another scenario: rules (like `PushProjectIntoTableSourceScanRule`) does not 
change statistics   , so the new TableSource created by the rule could reuse 
the original TableSource, and avoid to call `TableSource`#getTableStats method 
which is high cost.
   so the ` def copy(statistic: FlinkStatistic): FlinkTable` method defined in 
`FlinkTable` should not be deleted too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865278#comment-16865278
 ] 

shuai.xu commented on FLINK-12863:
--

Hi [~till.rohrmann], as [~xiaogang.shi] said, we found the same race condition 
between RM and TM, and adding a version in each slot to solve it. I think 
adding fencing token to AllocatedSlotReport can solve it. But how would you 
update the fencing token? When offering slots succeeds or before offering 
slots? If when offering slots succeeds, it may happen that JM use the new 
fencing token while TM considering the offering slots fail, so TM may not 
update the token, and JM have no change to use the old token any more. If TM 
updates the token before offering slots, it may happen that JM doesn't receive 
the offering, so JM doesn't update the token. I think using a version may be 
more suitable, as we can compare two version, the bigger version will be 
correct always.

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294120943
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 ##
 @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) {
 * @param types field types
 * @param fields field names
 * @param statistic statistic of current table
+* @param uniqueKeys unique keys of current table
 * @return returns the registered [[Table]].
 */
   def addTableSource(
   name: String,
   types: Array[TypeInformation[_]],
   fields: Array[String],
-  statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table
+  statistic: TableStats,
 
 Review comment:
   we may add more info for `FlinkStatistic` in future, use `FlinkStatistic` 
instead of each fields to make sure this method and related test cases are 
stable. `relModifiedMonotonicity`  is also a member of `FlinkStatistic` and  is 
not defined in this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294125156
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation 
typeInfo) {
return field(name, fromLegacyInfoToDataType(typeInfo));
}
 
+   /**
+* Add a primary key with the given field names.
+* There can only be one PRIMARY KEY for a given table
+* See the {@link TableSchema} class javadoc for more 
definition about primary key.
+*/
+   public Builder primaryKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The primary key fields shouldn't be null or 
empty.");
+   Preconditions.checkArgument(
+   primaryKey == null,
+   "A primary key " + primaryKey +
+   " have been defined, can not define 
another primary key " +
+   Arrays.toString(fields));
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
+   primaryKey = Arrays.asList(fields);
+   return this;
+   }
+
+   /**
+* Add an unique key with the given field names.
+* There can be more than one UNIQUE KEY for a given table.
+* See the {@link TableSchema} class javadoc for more 
definition about unique key.
+*/
+   public Builder uniqueKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The unique key fields shouldn't be null or 
empty.");
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
+   if (uniqueKeys == null) {
+   uniqueKeys = new ArrayList<>();
+   }
+   uniqueKeys.add(Arrays.asList(fields));
 
 Review comment:
   Sure, I will add that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294125122
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation 
typeInfo) {
return field(name, fromLegacyInfoToDataType(typeInfo));
}
 
+   /**
+* Add a primary key with the given field names.
+* There can only be one PRIMARY KEY for a given table
+* See the {@link TableSchema} class javadoc for more 
definition about primary key.
+*/
+   public Builder primaryKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The primary key fields shouldn't be null or 
empty.");
+   Preconditions.checkArgument(
+   primaryKey == null,
+   "A primary key " + primaryKey +
+   " have been defined, can not define 
another primary key " +
+   Arrays.toString(fields));
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
 
 Review comment:
   Yes. I think so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
wuchong commented on a change in pull request #8736: [FLINK-12846][table] Carry 
primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294125081
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -43,7 +44,28 @@
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
- * A table schema that represents a table's structure with field names and 
data types.
+ * A table schema that represents a table's structure with field names and 
data types and some
+ * constraint information (e.g. primary key, unique key).
+ *
+ * Concepts about primary key and unique key:
 
 Review comment:
   The difference between primary key and unique key is that there is only one 
primary key and there can be more than one unique key. And a primary key 
doesn't need to be declared in unique key list again.
   
   I will add this to the class Javadoc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-16 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12852:

Component/s: Runtime / Network

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> [Flink-12761|https://issues.apache.org/jira/browse/FLINK-12761] can solve 
> this problem, but AFAIK in 1.9 it will not include the network memory into 
> the ResourceProfile. I think the possible solution currently may be one of
>  # Make the required and max equal for the local buffer pool.
>  # Add max retrying for allocating exclusive buffers. When exceeding the 
> maximum retrying times, the task will fail and throw an exception that tells 
> users to increase the network memory.
> I think the second one may be better.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294120943
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 ##
 @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) {
 * @param types field types
 * @param fields field names
 * @param statistic statistic of current table
+* @param uniqueKeys unique keys of current table
 * @return returns the registered [[Table]].
 */
   def addTableSource(
   name: String,
   types: Array[TypeInformation[_]],
   fields: Array[String],
-  statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table
+  statistic: TableStats,
 
 Review comment:
   we may add more info for `FlinkStatistic` in future, use `FlinkStatistic` 
instead of each fields to make sure this method and related test cases are 
stable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294124528
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   Another scenario: rules (like `PushProjectIntoTableSourceScanRule`) does not 
change statistics   , so the new TableSource created by the rule could reuse 
the original TableSource, and avoid to call `TableSource`#getTableStats method 
which is high cost.
   so the ` def copy(statistic: FlinkStatistic): FlinkTable` method defined in 
`FlinkTable` should not be delete too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-16 Thread GitBox
WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align 
Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#issuecomment-502518727
 
 
   @dianfu Thanks for you review again! I have addressed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865269#comment-16865269
 ] 

Xiaogang Shi commented on FLINK-12863:
--

Btw, i want to note that the race condition may not necessarily be caused by 
{{HeartbeatManagerSenderImpl}} sending heartbeats in a seperate thread. It can 
solve the problem in JM, but not the one in RM.

Even when RM send heartbeat requests in the main thread, right after a slot 
request, the heartbeart responses may be handled first by RM. It's because RM 
uses ask to send both heartbeat and slot requests. Temporary {{PromiseActor}}s 
will be created to receive responses from TM. Since there is no guarantee on 
the execution order of actors, the {{PromiseActor}} which receives response 
first may be executed later.



> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294120943
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 ##
 @@ -147,13 +161,15 @@ abstract class TableTestUtil(test: TableTestBase) {
 * @param types field types
 * @param fields field names
 * @param statistic statistic of current table
+* @param uniqueKeys unique keys of current table
 * @return returns the registered [[Table]].
 */
   def addTableSource(
   name: String,
   types: Array[TypeInformation[_]],
   fields: Array[String],
-  statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table
+  statistic: TableStats,
 
 Review comment:
   we may add more info for `FlinkStatistic`, use `FlinkStatistic` instead of 
each fields to make sure this method and related test cases are stable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294118121
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation 
typeInfo) {
return field(name, fromLegacyInfoToDataType(typeInfo));
}
 
+   /**
+* Add a primary key with the given field names.
+* There can only be one PRIMARY KEY for a given table
+* See the {@link TableSchema} class javadoc for more 
definition about primary key.
+*/
+   public Builder primaryKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The primary key fields shouldn't be null or 
empty.");
+   Preconditions.checkArgument(
+   primaryKey == null,
+   "A primary key " + primaryKey +
+   " have been defined, can not define 
another primary key " +
+   Arrays.toString(fields));
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
 
 Review comment:
   that means we must build fieldNames first ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294117111
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -43,7 +44,28 @@
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
- * A table schema that represents a table's structure with field names and 
data types.
+ * A table schema that represents a table's structure with field names and 
data types and some
+ * constraint information (e.g. primary key, unique key).
+ *
+ * Concepts about primary key and unique key:
 
 Review comment:
   do we distinguish primary key from unique key? in current javadoc, they have 
no difference.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294121385
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
 ##
 @@ -121,6 +123,22 @@ object MetadataTestUtil {
 getDataStreamTable(schema, new FlinkStatistic(tableStats, uniqueKeys))
   }
 
+  private def createStudentTableSource(): TableSourceTable[BaseRow] = {
+val schema = TableSchema.builder()
+  .field("id", DataTypes.BIGINT())
+  .field("name", DataTypes.STRING())
+  .field("score", DataTypes.DOUBLE())
+  .field("age", DataTypes.INT())
+  .field("height", DataTypes.DOUBLE())
+  .field("sex", DataTypes.STRING())
+  .field("class", DataTypes.INT())
+  .uniqueKey("id")
+  .build()
+val tableSource = new TestTableSource(true, schema)
+new TableSourceTable[BaseRow](tableSource, false)
+  }
+
+
 
 Review comment:
   delete redundant blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294118265
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##
 @@ -343,13 +412,66 @@ public Builder field(String name, TypeInformation 
typeInfo) {
return field(name, fromLegacyInfoToDataType(typeInfo));
}
 
+   /**
+* Add a primary key with the given field names.
+* There can only be one PRIMARY KEY for a given table
+* See the {@link TableSchema} class javadoc for more 
definition about primary key.
+*/
+   public Builder primaryKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The primary key fields shouldn't be null or 
empty.");
+   Preconditions.checkArgument(
+   primaryKey == null,
+   "A primary key " + primaryKey +
+   " have been defined, can not define 
another primary key " +
+   Arrays.toString(fields));
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
+   primaryKey = Arrays.asList(fields);
+   return this;
+   }
+
+   /**
+* Add an unique key with the given field names.
+* There can be more than one UNIQUE KEY for a given table.
+* See the {@link TableSchema} class javadoc for more 
definition about unique key.
+*/
+   public Builder uniqueKey(String... fields) {
+   Preconditions.checkArgument(
+   fields != null && fields.length > 0,
+   "The unique key fields shouldn't be null or 
empty.");
+   for (String field : fields) {
+   if (!fieldNames.contains(field)) {
+   throw new IllegalArgumentException("The 
field '" + field +
+   "' is not existed in the 
schema.");
+   }
+   }
+   if (uniqueKeys == null) {
+   uniqueKeys = new ArrayList<>();
+   }
+   uniqueKeys.add(Arrays.asList(fields));
 
 Review comment:
   should distinct uniqueKey ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] Carry primary key and unique key information in TableSchema

2019-06-16 Thread GitBox
godfreyhe commented on a change in pull request #8736: [FLINK-12846][table] 
Carry primary key and unique key information in TableSchema
URL: https://github.com/apache/flink/pull/8736#discussion_r294119747
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ##
 @@ -19,20 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.{JHashSet, JSet}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
+import java.util
+
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
   * a Calcite Table
   */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
-val isStreaming: Boolean,
-val statistic: FlinkStatistic)
 
 Review comment:
   if the `statistic` is deleted, how to store the statistic from catalog?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12653) Keyed state backend fails to restore during rescaling

2019-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865268#comment-16865268
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-12653:
-

Thanks for the detailed investigation [~mxm]. This does indeed look like a bug.
The fact that this works with RocksDB, but not with the heap backend also makes 
sense now, since heap tries to eagerly deserialize all state on restore.

One thing that we have been considering recently while designing FLIP-43 
(Savepoint connector / State Processing API), is to merge all registered state 
info (state name, state type information / serializer snapshot, owning 
operator, etc.) and write it as part of the savepoint metadata file. The 
information should be merged such that there is a global view of all registered 
states across all subtasks of a given operator.
With this, upon rescaling, all restored subtasks should receive complete meta 
data via the {{KeyedStateHandle}} class.

Do you think this will fix this problem, and in general is a reasonable 
approach?

> Keyed state backend fails to restore during rescaling
> -
>
> Key: FLINK-12653
> URL: https://issues.apache.org/jira/browse/FLINK-12653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Beam 2.12.0 or any other Beam version
> Flink >= 1.6
> Heap/Filesystem state backend (RocksDB works fine)
>Reporter: Maximilian Michels
>Priority: Critical
>
> The Flink Runner includes a test which verifies checkpoints/savepoints work 
> correctly with Beam on Flink. When adding additional tests for 
> scaleup/scaledown [1], I came across a bug with restoring the keyed state 
> backend. After a fair amount of debugging Beam code and checking any 
> potential issues with serializers, I think this could be a Flink issue.
> Steps to reproduce: 
> 1. {{git clone https://github.com/mxm/beam}}
> 2. {{cd beam && git checkout savepoint-problem}}
> 3. {{./gradlew :runners:flink:1.6:test --tests 
> "**.FlinkSavepointTest.testSavepointRestoreLegacy"}}
> Error:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>   ... 5 more
> Caused by: java.lang.RuntimeException: Invalid namespace string: ''
>   at 
> org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
>   at 
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
>   at 
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
>   at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
>   at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
>   at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>   at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
>   

[jira] [Updated] (FLINK-12176) Unify JobGraph creation in CliFrontend

2019-06-16 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-12176:
-
Attachment: patch.diff

> Unify JobGraph creation in CliFrontend
> --
>
> Key: FLINK-12176
> URL: https://issues.apache.org/jira/browse/FLINK-12176
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Attachments: patch.diff
>
>
> Currently, we create {{JobGraph}} by the following process
> * if the cluster start in job mode, we create {{JobGraph}} by 
> {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
> * if the cluster start in session mode, we create {{JobGraph}} and submit it 
> by {{CliFrontend#executeProgram}}, which internally the same as above but 
> using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.
> {{ContextEnvironment}} not only create the job graph but also submit it. 
> However, the processes of {{JobGraph}} creation in job mode and session mode 
> are similar. That means, we can unify the process by always create 
> {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And,
> * in job mode, deploy job cluster with the {{JobGraph}}
> * in session mode, submit the {{JobGraph}} to the session cluster
> From a higher view, it is helpful for a common view of job submission in both 
> job and session mode and give opportunities to refactor legacy client codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12176) Unify JobGraph creation in CliFrontend

2019-06-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865259#comment-16865259
 ] 

TisonKun commented on FLINK-12176:
--

[~till.rohrmann] After a closer look of {{ExecutionEnvironments}} I wonder what 
exactly means when you said "{{OptimizerPlanEnvironment}} does not support 
eager execution calls". In order to see the error I patched to switch from 
{{ContextEnvironment}} to {{OptimizerPlanEnvironment}} but none of tests 
reported an error(from which I had expected to learn what's wrong with this 
change).

 [^patch.diff] 

> Unify JobGraph creation in CliFrontend
> --
>
> Key: FLINK-12176
> URL: https://issues.apache.org/jira/browse/FLINK-12176
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Attachments: patch.diff
>
>
> Currently, we create {{JobGraph}} by the following process
> * if the cluster start in job mode, we create {{JobGraph}} by 
> {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
> * if the cluster start in session mode, we create {{JobGraph}} and submit it 
> by {{CliFrontend#executeProgram}}, which internally the same as above but 
> using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.
> {{ContextEnvironment}} not only create the job graph but also submit it. 
> However, the processes of {{JobGraph}} creation in job mode and session mode 
> are similar. That means, we can unify the process by always create 
> {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And,
> * in job mode, deploy job cluster with the {{JobGraph}}
> * in session mode, submit the {{JobGraph}} to the session cluster
> From a higher view, it is helpful for a common view of job submission in both 
> job and session mode and give opportunities to refactor legacy client codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.

2019-06-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-12273:


Assignee: vinoyang

> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.
> ---
>
> Key: FLINK-12273
> URL: https://issues.apache.org/jira/browse/FLINK-12273
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Mr.Nineteen
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ…

2019-06-16 Thread GitBox
wuchong commented on issue #8712: [FLINK-12819][sql-client] Reuse 
TableEnvironment between different SQ…
URL: https://github.com/apache/flink/pull/8712#issuecomment-502513427
 
 
   Sorry @docete ,  you are right, transformations will be cleared after 
execution.
   
   I will review it again. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12122:

Fix Version/s: (was: 1.8.1)
   1.8.2

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.2
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865244#comment-16865244
 ] 

sunjincheng commented on FLINK-12122:
-

Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.2
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-8513:
---
Fix Version/s: (was: 1.8.1)
   1.8.2

> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: chris snow
>Assignee: Seth Wiesman
>Priority: Trivial
> Fix For: 1.9.0, 1.8.2
>
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865243#comment-16865243
 ] 

sunjincheng commented on FLINK-8513:


Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2

> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: chris snow
>Assignee: Seth Wiesman
>Priority: Trivial
> Fix For: 1.9.0, 1.8.2
>
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12273:

Fix Version/s: (was: 1.8.1)
   1.8.2

> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.
> ---
>
> Key: FLINK-12273
> URL: https://issues.apache.org/jira/browse/FLINK-12273
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Mr.Nineteen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12273) The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, otherwise it cannot be recovered according to checkpoint after failure.

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865242#comment-16865242
 ] 

sunjincheng commented on FLINK-12273:
-

Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2

> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.
> ---
>
> Key: FLINK-12273
> URL: https://issues.apache.org/jira/browse/FLINK-12273
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Mr.Nineteen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 2.0.0, 1.6.5, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The default value of CheckpointRetentionPolicy should be RETAIN_ON_FAILURE, 
> otherwise it cannot be recovered according to checkpoint after failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12578) Use secure URLs for Maven repositories

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12578:

Fix Version/s: (was: 1.8.1)
   1.8.2

> Use secure URLs for Maven repositories
> --
>
> Key: FLINK-12578
> URL: https://issues.apache.org/jira/browse/FLINK-12578
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally 
> they should have been https scheme.
> Below is the list of repositories which use http scheme in pom files for now:
>  * Confluent
>  * HWX
>  * MapR



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12578) Use secure URLs for Maven repositories

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865241#comment-16865241
 ] 

sunjincheng commented on FLINK-12578:
-

Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2

> Use secure URLs for Maven repositories
> --
>
> Key: FLINK-12578
> URL: https://issues.apache.org/jira/browse/FLINK-12578
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally 
> they should have been https scheme.
> Below is the list of repositories which use http scheme in pom files for now:
>  * Confluent
>  * HWX
>  * MapR



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11958) flink on windows yarn deploy failed

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865240#comment-16865240
 ] 

sunjincheng commented on FLINK-11958:
-

 Due to the release of 1.8.1, I change the fix version from 1.8.1 to 1.8.2.

> flink on windows yarn deploy failed
> ---
>
> Key: FLINK-11958
> URL: https://issues.apache.org/jira/browse/FLINK-11958
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.7.2
>Reporter: Matrix42
>Assignee: Matrix42
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Flink Version : 1.7.2
> Hadoop Version:2.7.5
> Yarn log:
> Application application_1551710861615_0002 failed 1 times due to AM Container 
> for appattempt_1551710861615_0002_01 exited with exitCode: 1
> For more detailed output, check application tracking 
> page:http://DESKTOP-919H80J:8088/cluster/app/application_1551710861615_0002Then,
>  click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_1551710861615_0002_01_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
> at org.apache.hadoop.util.Shell.run(Shell.java:482)
> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
> at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Shell output: 移动了 1 个文件。
> Container exited with a non-zero exit code 1
> Failing this attempt. Failing the application.
>  
> jobmanager.err:
> '$JAVA_HOME' 不是内部或外部命令,也不是可运行的程序或批处理文件。
> english: (Not internal or external commands, nor runnable programs or batch 
> files)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport

2019-06-16 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865239#comment-16865239
 ] 

Xiaogang Shi commented on FLINK-12863:
--

I think a similar problem happens in the heartbeats between RM and TM. 

When a RM receives a slot request from JM, it will find an available slot, mark 
it as pending, and send a slot request to TM. In the cases where the slot 
request is following a heartbeat request, RM will receive the heartbeat 
response first and will remove the pending slot. RM may reuse the slot when it 
receives a new slot request from JM, leading to duplicated slot allocation. 

A solution proposed by [~yungao.gy] is using version numbers. Each slot is 
equipped with a version number, which is increased once a new pending request 
is generated. These version numbers then are attached to the heartbeats sent to 
TM. Once a heartbeat response is received, we don't need to remove those 
pending slot requests whose version numbers are greater than those of 
heartbeats.

I think the solution can also work here. What do you think? 
[~yungao.gy][~till.rohrmann]

> Race condition between slot offerings and AllocatedSlotReport
> -
>
> Key: FLINK-12863
> URL: https://issues.apache.org/jira/browse/FLINK-12863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by 
> the {{TaskExecutor}} to synchronize its internal view on slot allocations 
> with the view of the {{JobMaster}}. It seems that there is a race condition 
> between offering slots and receiving the report because the 
> {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a 
> separate thread. 
> Due to that it can happen that we generate an {{AllocatedSlotReport}} just 
> before getting new slots offered. Since the report is sent from a different 
> thread, it can then happen that the response to the slot offerings is sent 
> earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an 
> outdated slot report on the {{TaskExecutor}} causing active slots to be 
> released.
> In order to solve the problem I propose to add a fencing token to the 
> {{AllocatedSlotReport}} which is being updated whenever we offer new slots to 
> the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the 
> {{TaskExecutor}} we compare the current slot report fencing token with the 
> received one and only process the report if they are equal. Otherwise we wait 
> for the next heartbeat to send us an up to date {{AllocatedSlotReport}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString

2019-06-16 Thread GitBox
JingsongLi commented on a change in pull request #8689: 
[FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
URL: https://github.com/apache/flink/pull/8689#discussion_r294118537
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 ##
 @@ -97,32 +105,12 @@ public static BinaryString blankString(int length) {
return fromBytes(spaces);
}
 
-   /**
-* Returns the number of bytes for a code point with the first byte as 
`b`.
-* @param b The first byte of a code point
-*/
-   private static int numBytesForFirstByte(final byte b) {
-   if (b >= 0) {
-   // 1 byte, 7 bits: 0xxx
-   return 1;
-   } else if ((b >> 5) == -2 && (b & 0x1e) != 0) {
-   // 2 bytes, 11 bits: 110x 10xx
-   return 2;
-   } else if ((b >> 4) == -2) {
-   // 3 bytes, 16 bits: 1110 10xx 10xx
-   return 3;
-   } else if ((b >> 3) == -2) {
-   // 4 bytes, 21 bits: 0xxx 10xx 10xx 10xx
-   return 4;
-   } else {
-   // throw new IllegalArgumentException();
-   // Skip the first byte disallowed in UTF-8
-   return 1;
-   }
-   }
+   // 
--
+   // Utility open methods on BinaryString
 
 Review comment:
   I mean `Open Interfaces`...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11958) flink on windows yarn deploy failed

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-11958:

Fix Version/s: (was: 1.8.1)
   1.8.2

> flink on windows yarn deploy failed
> ---
>
> Key: FLINK-11958
> URL: https://issues.apache.org/jira/browse/FLINK-11958
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.7.2
>Reporter: Matrix42
>Assignee: Matrix42
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.2
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Flink Version : 1.7.2
> Hadoop Version:2.7.5
> Yarn log:
> Application application_1551710861615_0002 failed 1 times due to AM Container 
> for appattempt_1551710861615_0002_01 exited with exitCode: 1
> For more detailed output, check application tracking 
> page:http://DESKTOP-919H80J:8088/cluster/app/application_1551710861615_0002Then,
>  click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_1551710861615_0002_01_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
> at org.apache.hadoop.util.Shell.run(Shell.java:482)
> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
> at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Shell output: 移动了 1 个文件。
> Container exited with a non-zero exit code 1
> Failing this attempt. Failing the application.
>  
> jobmanager.err:
> '$JAVA_HOME' 不是内部或外部命令,也不是可运行的程序或批处理文件。
> english: (Not internal or external commands, nor runnable programs or batch 
> files)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString

2019-06-16 Thread GitBox
JingsongLi commented on a change in pull request #8689: 
[FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
URL: https://github.com/apache/flink/pull/8689#discussion_r294118385
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 ##
 @@ -654,531 +498,211 @@ private BinaryString trimSlow() {
}
 
/**
-* Walk each character of current string from both ends, remove the 
character if it
-* is in trim string.
-* Return the new substring which both ends trim characters have been 
removed.
+* Returns the index within this string of the first occurrence of the
+* specified substring, starting at the specified index.
 *
-* @param trimStr the trim string
-* @return A subString which both ends trim characters have been 
removed.
+* @param   str the substring to search for.
+* @param   fromIndex   the index from which to start the search.
+* @return  the index of the first occurrence of the specified 
substring,
+*  starting at the specified index,
+*  or {@code -1} if there is no such occurrence.
 */
-   public BinaryString trim(BinaryString trimStr) {
-   if (trimStr == null) {
-   return null;
-   }
-   return trimLeft(trimStr).trimRight(trimStr);
-   }
-
-   public BinaryString trimLeft() {
+   public int indexOf(BinaryString str, int fromIndex) {
ensureMaterialized();
+   str.ensureMaterialized();
+   if (str.sizeInBytes == 0) {
+   return 0;
+   }
if (inFirstSegment()) {
-   int s = 0;
-   // skip all of the space (0x20) in the left side
-   while (s < this.sizeInBytes && getByteOneSegment(s) == 
0x20) {
-   s++;
-   }
-   if (s == this.sizeInBytes) {
-   // empty string
-   return EMPTY_UTF8;
-   } else {
-   return copyBinaryStringInOneSeg(s, 
this.sizeInBytes - 1);
+   // position in byte
+   int byteIdx = 0;
+   // position is char
+   int charIdx = 0;
+   while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+   byteIdx += 
numBytesForFirstByte(getByteOneSegment(byteIdx));
+   charIdx++;
}
+   do {
+   if (byteIdx + str.sizeInBytes > sizeInBytes) {
+   return -1;
+   }
+   if (SegmentsUtil.equals(segments, offset + 
byteIdx,
+   str.segments, str.offset, 
str.sizeInBytes)) {
+   return charIdx;
+   }
+   byteIdx += 
numBytesForFirstByte(getByteOneSegment(byteIdx));
+   charIdx++;
+   } while (byteIdx < sizeInBytes);
+
+   return -1;
} else {
-   return trimLeftSlow();
+   return indexOfMultiSegs(str, fromIndex);
}
}
 
-   private BinaryString trimLeftSlow() {
-   int s = 0;
+   private int indexOfMultiSegs(BinaryString str, int fromIndex) {
+   // position in byte
+   int byteIdx = 0;
+   // position is char
+   int charIdx = 0;
int segSize = segments[0].size();
-   SegmentAndOffset front = firstSegmentAndOffset(segSize);
-   // skip all of the space (0x20) in the left side
-   while (s < this.sizeInBytes && front.value() == 0x20) {
-   s++;
-   front.nextByte(segSize);
-   }
-   if (s == this.sizeInBytes) {
-   // empty string
-   return EMPTY_UTF8;
-   } else {
-   return copyBinaryString(s, this.sizeInBytes - 1);
+   SegmentAndOffset index = firstSegmentAndOffset(segSize);
+   while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+   int charBytes = numBytesForFirstByte(index.value());
+   byteIdx += charBytes;
+   charIdx++;
+   index.skipBytes(charBytes, segSize);
}
+   do {
+   if (byteIdx + str.sizeInBytes > sizeInBytes) {
+   return -1;
+ 

[jira] [Commented] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865238#comment-16865238
 ] 

sunjincheng commented on FLINK-12539:
-

I do not want to revert the commit for 1.8 branches,  but just to avoid the 
sake of confusion in the 1.8.1 release note, I mark it as new improvement. 

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8689: [FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString

2019-06-16 Thread GitBox
JingsongLi commented on a change in pull request #8689: 
[FLINK-12802][table-runtime-blink] Reducing the Code of BinaryString
URL: https://github.com/apache/flink/pull/8689#discussion_r294118181
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
 ##
 @@ -1242,599 +763,32 @@ private void skipBytes(int n, int segSize) {
}
}
 
-   private byte value() {
+   byte value() {
return this.segment.get(this.offset);
}
}
 
/**
-* Parses this BinaryString to Long.
-*
-* Note that, in this method we accumulate the result in negative 
format, and convert it to
-* positive format at the end, if this string is not started with '-'. 
This is because min value
-* is bigger than max value in digits, e.g. Long.MAX_VALUE is 
'9223372036854775807' and
-* Long.MIN_VALUE is '-9223372036854775808'.
-*
-* This code is mostly copied from LazyLong.parseLong in Hive.
-* @return Long value if the parsing was successful else null.
-*/
-   public Long toLong() {
-   ensureMaterialized();
-   if (sizeInBytes == 0) {
-   return null;
-   }
-   int size = segments[0].size();
-   SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
-   int totalOffset = 0;
-
-   byte b = segmentAndOffset.value();
-   final boolean negative = b == '-';
-   if (negative || b == '+') {
-   segmentAndOffset.nextByte(size);
-   totalOffset++;
-   if (sizeInBytes == 1) {
-   return null;
-   }
-   }
-
-   long result = 0;
-   final byte separator = '.';
-   final int radix = 10;
-   final long stopValue = Long.MIN_VALUE / radix;
-   while (totalOffset < this.sizeInBytes) {
-   b = segmentAndOffset.value();
-   totalOffset++;
-   segmentAndOffset.nextByte(size);
-   if (b == separator) {
-   // We allow decimals and will return a 
truncated integral in that case.
-   // Therefore we won't throw an exception here 
(checking the fractional
-   // part happens below.)
-   break;
-   }
-
-   int digit;
-   if (b >= '0' && b <= '9') {
-   digit = b - '0';
-   } else {
-   return null;
-   }
-
-   // We are going to process the new digit and accumulate 
the result. However, before
-   // doing this, if the result is already smaller than the
-   // stopValue(Long.MIN_VALUE / radix), then result * 10 
will definitely be smaller
-   // than minValue, and we can stop.
-   if (result < stopValue) {
-   return null;
-   }
-
-   result = result * radix - digit;
-   // Since the previous result is less than or equal to
-   // stopValue(Long.MIN_VALUE / radix), we can just use 
`result > 0` to check overflow.
-   // If result overflows, we should stop.
-   if (result > 0) {
-   return null;
-   }
-   }
-
-   // This is the case when we've encountered a decimal separator. 
The fractional
-   // part will not change the number, but we will verify that the 
fractional part
-   // is well formed.
-   while (totalOffset < sizeInBytes) {
-   byte currentByte = segmentAndOffset.value();
-   if (currentByte < '0' || currentByte > '9') {
-   return null;
-   }
-   totalOffset++;
-   segmentAndOffset.nextByte(size);
-   }
-
-   if (!negative) {
-   result = -result;
-   if (result < 0) {
-   return null;
-   }
-   }
-   return result;
-   }
-
-   /**
-* Parses this BinaryString to Int.
-*
-* Note that, in this method we accumulate the result in negative 
format, and convert it to
-* positive format at the end, if this string is not started with '-'. 
This is because min value
-* is bigger 

[GitHub] [flink] ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy edited a comment on issue #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721#issuecomment-502511761
 
 
   Travis failed, there is an importing error in python module. I don't think 
it is caused by this PR. I will try to rebase master and trigger travis 
checking again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] PartitionTransformation supports DataExchan…

2019-06-16 Thread GitBox
ifndef-SleePy commented on issue #8721: [FLINK-12823][datastream] 
PartitionTransformation supports DataExchan…
URL: https://github.com/apache/flink/pull/8721#issuecomment-502511761
 
 
   Travis failed, there is am importing error in python module. I don't think 
it is caused by this PR. I will try to rebase master and trigger travis 
checking again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12539:

Issue Type: Improvement  (was: New Feature)

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-16 Thread GitBox
JingsongLi commented on issue #8700: [FLINK-12657][hive] Integrate Flink with 
Hive UDF
URL: https://github.com/apache/flink/pull/8700#issuecomment-502511195
 
 
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-16 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r294117338
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12539:

Issue Type: New Feature  (was: Improvement)

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865237#comment-16865237
 ] 

sunjincheng edited comment on FLINK-12539 at 6/17/19 2:06 AM:
--

I think this Jira is not about the New Feature(BTW, new Features should not 
merge into the bugfix branch). the changes only changing class visibility, So I 
think is an Improvement, and I change the Type to Improvement.


was (Author: sunjincheng121):
I think this Jira is not about the New Feature(BTW, new Features should not 
merge into the bugfix branch). the changes only changing class visibility, So I 
think is an Improvement. 

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] docete commented on issue #8712: [FLINK-12819][sql-client] Reuse TableEnvironment between different SQ…

2019-06-16 Thread GitBox
docete commented on issue #8712: [FLINK-12819][sql-client] Reuse 
TableEnvironment between different SQ…
URL: https://github.com/apache/flink/pull/8712#issuecomment-502509891
 
 
   Hi, @wuchong I checked flink planner and blink planner. Both of them have 
cleared transformations after execution. For flink planner, transformations are 
cleared in execution() function. For blink planner, transformations are cached 
in TableEnvironment and cleared after generating StreamGraph.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865237#comment-16865237
 ] 

sunjincheng commented on FLINK-12539:
-

I think this Jira is not about the New Feature(BTW, new Features should not 
merge into the bugfix branch). the changes only changing class visibility, So I 
think is an Improvement. 

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12539:

Issue Type: Improvement  (was: New Feature)

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12539:

Issue Type: New Feature  (was: Improvement)

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-06-16 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12539:

Issue Type: Improvement  (was: New Feature)

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8748: [FLINK-12857] [table] move FilterableTableSource into flink-table-common

2019-06-16 Thread GitBox
wuchong commented on issue #8748: [FLINK-12857] [table] move 
FilterableTableSource into flink-table-common
URL: https://github.com/apache/flink/pull/8748#issuecomment-502508222
 
 
   Thanks @godfreyhe , looks good to me.
   
   +1 to merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >