[jira] [Commented] (FLINK-12502) Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover

2019-05-26 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848621#comment-16848621
 ] 

leesf commented on FLINK-12502:
---

Yes [~gjy]

> Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover
> 
>
> Key: FLINK-12502
> URL: https://issues.apache.org/jira/browse/FLINK-12502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: leesf
>Priority: Major
>
> The {{JobMasterTest#testRequestNextInputSplitWithDataSourceFailover}} relies 
> on how many files you have in your working directory. This assumption is 
> quite brittle. Instead we should explicitly instantiate an 
> {{InputSplitAssigner}} with a defined number of input splits. 
> Moreover, we should make the assertions more explicit: Input split 
> comparisons should not rely solely on the length of the input split data.
> Maybe it is also not necessary to capture the full 
> {{TaskDeploymentDescriptor}} because we could already know the producer's and 
> consumer's {{JobVertexID}} when we create the {{JobGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8547: [FLINK-12267] Port SimpleSlotTest to new code base

2019-05-26 Thread GitBox
flinkbot commented on issue #8547: [FLINK-12267] Port SimpleSlotTest to new 
code base
URL: https://github.com/apache/flink/pull/8547#issuecomment-496081075
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12267) Port SimpleSlotTest to new code base

2019-05-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12267:
---
Labels: pull-request-available  (was: )

> Port SimpleSlotTest to new code base
> 
>
> Key: FLINK-12267
> URL: https://issues.apache.org/jira/browse/FLINK-12267
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Reporter: leesf
>Assignee: leesf
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Mainly get rid of {{Instance}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] leesf opened a new pull request #8547: [FLINK-12267] Port SimpleSlotTest to new code base

2019-05-26 Thread GitBox
leesf opened a new pull request #8547: [FLINK-12267] Port SimpleSlotTest to new 
code base
URL: https://github.com/apache/flink/pull/8547
 
 
   ## What is the purpose of the change
   
   *Get rid of Instance in SimpleSlotTest*
   
   
   ## Brief change log
   
   Return a new SimpleSlot in _getSlot_ instead of using Instance.
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   
   cc @GJL @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-05-26 Thread GitBox
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-496077999
 
 
   Hi, @zhijiangW thanks for your detail explanation, I make a conclusion below 
   Non-credit-based mode:
   - InPoolUsage
   
   Credit-based mode:
   - exclusiveInPoolUsage  exclusiveBufferUsed/exclusiveBufferTotal(all initial 
credit)
   - floatingInPoolUsage   floatingBufferUsed/floatingBufferTotal
   - InPoolUsage  combine with exclusiveInPoolUsage and floatingInPoolUsage
   
   I think this looks better to distinguish the buffer usage between floating 
and exclusive in credit-based mode and also give a view of the overall 
perspective with inPoolUsage metric.  WDYT @pnowojski @zhijiangW 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287640985
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -53,7 +53,21 @@ under the License.
 

org.apache.flink
-   flink-table-api-java
+   
flink-hadoop-compatibility_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   Yes, we need blink planner for `BatchTableSink ` at the moment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-05-26 Thread GitBox
zhijiangW commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-496069672
 
 
   Thanks for the confirmation and good suggestions. @pnowojski 
   
   > > In other words, if we see the floating buffers are used, that means the 
exclusive buffers should also be used.
   > > This is not true for even slight data skew
   
   Maybe my previous expression is not very clear. If floating buffers are 
used, I only mean the exclusive buffers for one/some `RemoteInputChannels` are 
also expected to be used eventually, but not indicate the exclusive buffers for 
all the channels are used. 
   
   For example, if the producer's backlog is 1, we would always request another 
1 floating buffer even though the 2 exclusive buffers for this channel are 
available atm. Because we want to feedback some overhead credits beforehand in 
order not to block the network transport after producing more backlog soon. So 
it is not strong consistent for exclusive buffers used in time, might be 
eventual consistent within our expectation. If the backlog is becoming 0 from 
1, the previous requested floating buffer would also be released by this 
channel if the 2 exclusive buffers are still available. So from the aspect of 
one input channel, it would not occupy extra floating buffers if its available 
exclusive buffers are enough.
   
   I also agree with the above suggestions for distinguishing the total 
`inPoolUsage` and `floatingInPoolUsage`, or we could only retain 
`exclusiveInPoolUsage` and `floatingInPoolUsage` for credit-based mode.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
JingsongLi commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287639913
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -53,7 +53,21 @@ under the License.
 

org.apache.flink
-   flink-table-api-java
+   
flink-hadoop-compatibility_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   I think the reason is `BatchTableSink` with `BoundedStream` still in blink 
planner.
   We need unify `TableSource` and `TableSink` interface and  move them to api.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8523: [FLINK-12481][runtime] 
Invoke timer callback in task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#discussion_r287637082
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -1358,4 +1358,19 @@ public void actionsUnavailable() throws 
InterruptedException {
mailbox.putMail(actionUnavailableLetter);
}
}
+
+   private class TimerInvocationContext implements 
SystemProcessingTimeService.ScheduledCallbackExecutionContext {
+   @Override
+   public void invoke(ProcessingTimeCallback callback, long 
timestamp) throws InterruptedException {
+   mailbox.putMail(() -> {
+   synchronized (getCheckpointLock()) {
 
 Review comment:
   Does here still have to require the checkpointLock? I think it has 
synchronized by mailbox. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287633012
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   ` 2 minutes` is a value that I give freely, and it has little basis, just 
considering that a slightly longer wait may be required in the case of high CPU 
load. In fact, the blob client has a retry mechanism, and we should set it a 
smaller. Do you have any recommended values about connection and socket 
timeouts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287633012
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   ` 2 minutes` is a value that I give freely, and it has little basis, just 
considering that a slightly longer wait may be required in the case of high CPU 
load. In fact, the blob client has a retry mechanism, and we should set it a 
little smaller. Do you have any recommended values about connection and socket 
timeouts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287638025
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException {
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 
-   blobNonSslServer = new BlobServer(config, new VoidBlobStore());
+   blobNonSslServer = new TestBlobServer(config, new 
VoidBlobStore());
 
 Review comment:
   I checked the code carefully, and `TestBlobServer`  is not needed here. I 
will revert to `BlobServer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287637335
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -58,7 +58,7 @@ public static void startSSLServer() throws IOException {
Configuration config = 
SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
 
-   blobSslServer = new BlobServer(config, new VoidBlobStore());
+   blobSslServer = new TestBlobServer(config, new VoidBlobStore());
 
 Review comment:
   `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the 
new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is 
not used here, an error will occur.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287635535
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException {
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 
-   blobNonSslServer = new BlobServer(config, new VoidBlobStore());
+   blobNonSslServer = new TestBlobServer(config, new 
VoidBlobStore());
 
 Review comment:
   `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the 
new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is 
not used here, an error will occur.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287635709
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
 ##
 @@ -487,4 +488,58 @@ private static void uploadJarFile(
validateGetAndClose(blobClient.getInternal(jobId, 
blobKeys.get(0)), testFile);
}
}
+
+
+   /**
+* Tests the socket operation timeout.
+*/
+   @Test
+   public void testSocketTimeout() {
+   Configuration clientConfig = getBlobClientConfig();
+   int oldSoTimeout = 
clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);
+
+   clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
+   getBlobServer().setBlockingMillis(10_000);
+
+   try {
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+
+   try (BlobClient client = new BlobClient(serverAddress, 
clientConfig)) {
+   client.getInternal(new JobID(), 
BlobKey.createKey(TRANSIENT_BLOB));
+
+   fail("Should throw an exception.");
+   } catch (Throwable t) {
+   
assertEquals(java.net.SocketTimeoutException.class, 
ExceptionUtils.stripException(t, IOException.class).getClass());
 
 Review comment:
   > We could create a new test class which contains all blob server tests 
which need to start a new blob server for each test.
   
   It's a good idea. But considering that only one test currently has this 
need, in order to reduce the maintenance cost of the code, I think we can 
create a new test class when more tests have this need in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287636685
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -53,7 +53,21 @@ under the License.
 

org.apache.flink
-   flink-table-api-java
+   
flink-hadoop-compatibility_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   Firstly, this will not happen very soon. Secondly, i don't see a reason that 
a planner should be dependent for a connector. Could you explain the reason?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287635709
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
 ##
 @@ -487,4 +488,58 @@ private static void uploadJarFile(
validateGetAndClose(blobClient.getInternal(jobId, 
blobKeys.get(0)), testFile);
}
}
+
+
+   /**
+* Tests the socket operation timeout.
+*/
+   @Test
+   public void testSocketTimeout() {
+   Configuration clientConfig = getBlobClientConfig();
+   int oldSoTimeout = 
clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);
+
+   clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
+   getBlobServer().setBlockingMillis(10_000);
+
+   try {
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+
+   try (BlobClient client = new BlobClient(serverAddress, 
clientConfig)) {
+   client.getInternal(new JobID(), 
BlobKey.createKey(TRANSIENT_BLOB));
+
+   fail("Should throw an exception.");
+   } catch (Throwable t) {
+   
assertEquals(java.net.SocketTimeoutException.class, 
ExceptionUtils.stripException(t, IOException.class).getClass());
 
 Review comment:
   > We could create a new test class which contains all blob server tests 
which need to start a new blob server for each test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287635535
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException {
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 
-   blobNonSslServer = new BlobServer(config, new VoidBlobStore());
+   blobNonSslServer = new TestBlobServer(config, new 
VoidBlobStore());
 
 Review comment:
   `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the 
new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is 
not used here, an error will occur.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287635284
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 ##
 @@ -50,6 +54,7 @@ private static HiveConf getHiveConf() throws IOException {
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, 
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, 
warehouseUri);
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "");
 
 Review comment:
   it's not needed anymore, will remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287634504
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -53,7 +53,21 @@ under the License.
 

org.apache.flink
-   flink-table-api-java
+   
flink-hadoop-compatibility_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   Yes we have. I thought the blink planner will eventually replace flink's. Is 
there any reason why we can't depend on it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287633517
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.IOException;
+
+/**
+ * Util class for accessing Hive tables.
+ */
+public class HiveTableUtil {
+
+   private HiveTableUtil() {
+   }
+
+   /**
+* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+*/
+   public static ObjectInspector getObjectInspector(TypeInformation 
flinkType) throws IOException {
+   return getObjectInspector(toHiveTypeInfo(flinkType));
+   }
+
+   // TODO: reuse Hive's TypeInfoUtils?
+   private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
+   switch (type.getCategory()) {
+
+   case PRIMITIVE:
+   PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
+   return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+   // TODO: support complex types
+   default:
+   throw new IOException("Unsupported Hive type 
category " + type.getCategory());
+   }
+   }
+
+   /**
+* Converts a Flink {@link TypeInformation} to corresponding Hive 
{@link TypeInfo}.
+*/
+   public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) {
 
 Review comment:
   It's not. We need a `TypeInfo` here and `HiveTypeUtil.toHiveType()` gives 
the name of a hive type. But I guess this method can be moved to `HiveTypeUtil`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287633298
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -590,6 +578,17 @@ private  static Table instantiateHiveTable(ObjectPath 
tablePath, CatalogBaseTabl
return hiveTable;
}
 
+   private static void setStorageFormat(StorageDescriptor sd, Map properties) {
+   // TODO: simply use text format for now
+   String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
+   StorageFormatDescriptor sfDescriptor = 
storageFormatFactory.get(storageFormatName);
+   checkArgument(sfDescriptor != null, "Unknown storage format " + 
storageFormatName);
 
 Review comment:
   It doesn't directly check input argument. It checks whether we can get a 
StorageFormatDescriptor based on the input argument.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-26 Thread GitBox
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287633012
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   ` 2 minutes` is a value that I give freely, and it has little basis, just 
considering that a slightly longer wait may be required in the case of high CPU 
load. In fact, the blob client has a retry mechanism. and we should set it a 
little smaller. Do you have any recommended values about connection and socket 
timeouts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor

2019-05-26 Thread GitBox
flinkbot commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of 
JobExecutor
URL: https://github.com/apache/flink/pull/8546#issuecomment-496059964
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor

2019-05-26 Thread GitBox
zjffdu commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of 
JobExecutor
URL: https://github.com/apache/flink/pull/8546#issuecomment-496059883
 
 
   @tillrohrmann  Could you help review it ? I notice you are original author 
of `JobExecutor`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12621) Use MiniCluster instead of JobExecutor

2019-05-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12621:
---
Labels: pull-request-available  (was: )

> Use MiniCluster instead of JobExecutor
> --
>
> Key: FLINK-12621
> URL: https://issues.apache.org/jira/browse/FLINK-12621
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> JobExecutor is specifically used for local mode, I don't think we need to 
> introduce new class/interface for local mode, we should use the existing 
> MiniCluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate

2019-05-26 Thread GitBox
KurtYoung commented on a change in pull request #8527: [FLINK-12610] 
[table-planner-blink] Introduce planner rules about aggregate
URL: https://github.com/apache/flink/pull/8527#discussion_r287631851
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/AggregateCalcMergeRule.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate}
+ * on top of a {@link org.apache.calcite.rel.core.Calc} and if possible
+ * aggregate through the calc or removes the calc.
+ *
+ * This is only possible when no condition in calc and the grouping 
expressions and arguments to
 
 Review comment:
   If you can't merge condition, why not change the rule match pattern to 
`aggregate on project`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate

2019-05-26 Thread GitBox
KurtYoung commented on a change in pull request #8527: [FLINK-12610] 
[table-planner-blink] Introduce planner rules about aggregate
URL: https://github.com/apache/flink/pull/8527#discussion_r287631994
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
 ##
 @@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical;
+
+import org.apache.flink.table.plan.util.AggregateUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlSplittableAggFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.Mappings;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/**
+ * This rule is copied from Calcite's {@link 
org.apache.calcite.rel.rules.AggregateJoinTransposeRule}.
+ * Modification:
+ * - Do not match TemporalTableScan since it means that it is a dimension 
table scan currently.
 
 Review comment:
   Put this rule into CBO, and we can avoid to deal with such situation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu opened a new pull request #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor

2019-05-26 Thread GitBox
zjffdu opened a new pull request #8546: [FLINK-12621]. Use MiniCluster instead 
of JobExecutor
URL: https://github.com/apache/flink/pull/8546
 
 
   ## What is the purpose of the change
   
   This is a refactoring PR which remove `JobExecutor` and use `MiniCluster` 
instead. `JobExecutor` is specifically used for local mode, I don't think we 
need to introduce such kind of new class/interface for local mode, we could 
just use the existing `MiniCluster`.
   
   ## Brief change log
   
   * Remove `JobExecutor` &  `JobExecutorService`
   * Use `ClusterClient` to replace `JobExecutorService`
   
   ## Verifying this change
   
   Current CI is passed, no new code is introduced, just some refactoring.
   
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables

2019-05-26 Thread GitBox
KurtYoung commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r287631347
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -58,6 +58,28 @@ under the License.
provided

 
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   Have you discussed about this? IMO the blink planner should not be dependent.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287631136
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -53,7 +53,21 @@ under the License.
 

org.apache.flink
-   flink-table-api-java
+   
flink-hadoop-compatibility_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
flink-table-planner-blink_${scala.binary.version}
 
 Review comment:
   Have you discussed about this? IMO the blink planner should not be dependent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287631199
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.IOException;
+
+/**
+ * Util class for accessing Hive tables.
+ */
+public class HiveTableUtil {
+
+   private HiveTableUtil() {
+   }
+
+   /**
+* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+*/
+   public static ObjectInspector getObjectInspector(TypeInformation 
flinkType) throws IOException {
+   return getObjectInspector(toHiveTypeInfo(flinkType));
+   }
+
+   // TODO: reuse Hive's TypeInfoUtils?
+   private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
+   switch (type.getCategory()) {
+
+   case PRIMITIVE:
+   PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
+   return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+   // TODO: support complex types
+   default:
+   throw new IOException("Unsupported Hive type 
category " + type.getCategory());
+   }
+   }
+
+   /**
+* Converts a Flink {@link TypeInformation} to corresponding Hive 
{@link TypeInfo}.
+*/
+   public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) {
+   if (flinkType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
 
 Review comment:
   we can't because flink type is not enum


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell

2019-05-26 Thread GitBox
zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] 
Unify all kinds of cluster via ClusterClient in FlinkShell
URL: https://github.com/apache/flink/pull/8533#discussion_r287630879
 
 

 ##
 File path: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ##
 @@ -194,47 +200,28 @@ object FlinkShell {
 val configDirectory = new File(confDirPath)
 val configuration = 
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath)
 
-val (repl, cluster) = try {
-  val (host, port, cluster) = fetchConnectionInfo(configuration, config)
-  val conf = cluster match {
-case Some(Left(_)) => configuration
-case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
-case None => configuration
-  }
-
+try {
+  val (host, port, shouldShutdownCluster, clusterClient) = 
fetchConnectionInfo(configuration, config)
+  val conf = clusterClient.getFlinkConfiguration
   println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
-  val repl = bufferedReader match {
-case Some(reader) =>
-  val out = new StringWriter()
-  new FlinkILoop(host, port, conf, config.externalJars, reader, new 
JPrintWriter(out))
-case None =>
-  new FlinkILoop(host, port, conf, config.externalJars)
-  }
-
-  (repl, cluster)
+  new FlinkILoop(host, port, conf, config.externalJars, clusterClient, 
shouldShutdownCluster, in,
+new JPrintWriter(out))
 } catch {
   case e: IllegalArgumentException =>
 println(s"Error: ${e.getMessage}")
 sys.exit()
 }
+  }
+
+  def startShell(config: Config, in: Option[BufferedReader], out: 
JPrintWriter): Unit = {
+println("Starting Flink Shell:")
 
+val flinkILoop = createFlinkILoop(config, in, out)
 val settings = new Settings()
 settings.usejavacp.value = true
 settings.Yreplsync.value = true
 
-try {
-  repl.process(settings)
-} finally {
-  repl.closeInterpreter()
-  cluster match {
-case Some(Left(miniCluster)) => miniCluster.close()
-case Some(Right(yarnCluster)) =>
-  yarnCluster.shutDownCluster()
-  yarnCluster.shutdown()
-case _ =>
-  }
-}
-
+flinkILoop.process(settings)
 
 Review comment:
   It is not necessary to call `closeInterpreter` explicitly, as it will be 
called in method `process` 
https://github.com/scala/scala/blob/2.11.x/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L998


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell

2019-05-26 Thread GitBox
zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] 
Unify all kinds of cluster via ClusterClient in FlinkShell
URL: https://github.com/apache/flink/pull/8533#discussion_r287630879
 
 

 ##
 File path: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ##
 @@ -194,47 +200,28 @@ object FlinkShell {
 val configDirectory = new File(confDirPath)
 val configuration = 
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath)
 
-val (repl, cluster) = try {
-  val (host, port, cluster) = fetchConnectionInfo(configuration, config)
-  val conf = cluster match {
-case Some(Left(_)) => configuration
-case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
-case None => configuration
-  }
-
+try {
+  val (host, port, shouldShutdownCluster, clusterClient) = 
fetchConnectionInfo(configuration, config)
+  val conf = clusterClient.getFlinkConfiguration
   println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
-  val repl = bufferedReader match {
-case Some(reader) =>
-  val out = new StringWriter()
-  new FlinkILoop(host, port, conf, config.externalJars, reader, new 
JPrintWriter(out))
-case None =>
-  new FlinkILoop(host, port, conf, config.externalJars)
-  }
-
-  (repl, cluster)
+  new FlinkILoop(host, port, conf, config.externalJars, clusterClient, 
shouldShutdownCluster, in,
+new JPrintWriter(out))
 } catch {
   case e: IllegalArgumentException =>
 println(s"Error: ${e.getMessage}")
 sys.exit()
 }
+  }
+
+  def startShell(config: Config, in: Option[BufferedReader], out: 
JPrintWriter): Unit = {
+println("Starting Flink Shell:")
 
+val flinkILoop = createFlinkILoop(config, in, out)
 val settings = new Settings()
 settings.usejavacp.value = true
 settings.Yreplsync.value = true
 
-try {
-  repl.process(settings)
-} finally {
-  repl.closeInterpreter()
-  cluster match {
-case Some(Left(miniCluster)) => miniCluster.close()
-case Some(Right(yarnCluster)) =>
-  yarnCluster.shutDownCluster()
-  yarnCluster.shutdown()
-case _ =>
-  }
-}
-
+flinkILoop.process(settings)
 
 Review comment:
   It is not necessary to call `closeInterpreter` implicitly, as it will be 
called in method `process` 
https://github.com/scala/scala/blob/2.11.x/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L998


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API

2019-05-26 Thread GitBox
zhijiangW commented on a change in pull request #8485: [FLINK-12555] Introduce 
an encapsulated metric group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#discussion_r287630954
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
 ##
 @@ -72,4 +72,47 @@ public Counter getNumBuffersInLocalCounter() {
public Counter getNumBuffersInRemoteCounter() {
return numBuffersInRemote;
}
+
+   private static class MultiCounterWrapper implements Counter {
+   private final Counter[] counters;
+
+   private MultiCounterWrapper(Counter ... counters) {
+   Preconditions.checkArgument(counters.length > 0);
+   this.counters = counters;
+   }
+
+   @Override
+   public void inc() {
+   for (Counter c : counters) {
+   c.inc();
+   }
+   }
+
+   @Override
+   public void inc(long n) {
+   for (Counter c : counters) {
+   c.inc(n);
+   }
+   }
+
+   @Override
+   public void dec() {
+   for (Counter c : counters) {
+   c.dec();
+   }
+   }
+
+   @Override
+   public void dec(long n) {
+   for (Counter c : counters) {
+   c.dec(n);
+   }
+   }
+
+   @Override
+   public long getCount() {
+   // assume that the counters are not accessed directly 
elsewhere
 
 Review comment:
   I am not sure whether `getCounter` would be actually used. If used we should 
keep the return counter as previous structure. That means calling `new 
InputChannelMetrics(parentGroup, networkGroup) instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-26 Thread GitBox
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287630791
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.TypeConverters;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A table sink to write to Hive tables.
+ */
+public class HiveTableSink implements BatchTableSink {
+
+   private final JobConf jobConf;
+   private final RowTypeInfo rowTypeInfo;
+   private final String dbName;
+   private final String tableName;
+   private final List partitionCols;
+
+   public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String 
dbName,
+   String tableName, List 
partitionCols) {
+   this.jobConf = jobConf;
+   this.rowTypeInfo = rowTypeInfo;
+   this.dbName = dbName;
+   this.tableName = tableName;
+   this.partitionCols = partitionCols;
+   }
+
+   @Override
+   public DataStreamSink emitBoundedStream(DataStream 
boundedStream, TableConfig tableConfig, ExecutionConfig executionConfig) {
+   // TODO: support partitioning
+   final boolean isPartitioned = false;
+   // TODO: support overwrite
+   final boolean overwrite = false;
+   HiveTablePartition hiveTablePartition;
+   HiveTableOutputFormat outputFormat;
+   IMetaStoreClient client = HMSClientFactory.create(new 
HiveConf(jobConf, HiveConf.class));
+   try {
+   Table table = client.getTable(dbName, tableName);
+   StorageDescriptor sd = table.getSd();
+   // here we use the sdLocation to store the output path 
of the job, which is always a staging dir
+   String sdLocation = sd.getLocation();
+   if (isPartitioned) {
+   // TODO: implement this
+   } else {
+   sd.setLocation(toStagingDir(sdLocation, 
jobConf));
+   hiveTablePartition = new HiveTablePartition(sd, 
null);
+   }
+   outputFormat = new HiveTableOutputFormat(jobConf, 
dbName, tableName, partitionCols,
+   rowTypeInfo, hiveTablePartition, 
MetaStoreUtils.getTableMetadata(table), overwrite);
+   } catch (TException e) {
+   throw new CatalogException("Failed to query Hive 
metastore", e);
+   } 

[jira] [Created] (FLINK-12629) Add integration test for scala shell in yarn mode

2019-05-26 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-12629:
--

 Summary: Add integration test for scala shell in yarn mode
 Key: FLINK-12629
 URL: https://issues.apache.org/jira/browse/FLINK-12629
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN, Scala Shell
Affects Versions: 1.8.0
Reporter: Jeff Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12629) Add integration test for scala shell in yarn mode

2019-05-26 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang reassigned FLINK-12629:
--

Assignee: Jeff Zhang

> Add integration test for scala shell in yarn mode
> -
>
> Key: FLINK-12629
> URL: https://issues.apache.org/jira/browse/FLINK-12629
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN, Scala Shell
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12167) ClusterClient doesn't unset the context class loader after program run

2019-05-26 Thread Abdul Qadeer (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdul Qadeer resolved FLINK-12167.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

> ClusterClient doesn't unset the context class loader after program run
> --
>
> Key: FLINK-12167
> URL: https://issues.apache.org/jira/browse/FLINK-12167
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.8.0
>Reporter: Abdul Qadeer
>Assignee: Abdul Qadeer
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>  
> {code:java}
> public JobSubmissionResult run(PackagedProgram prog, int parallelism)
> {code}
> This method doesn't restore the thread's original class loader after program 
> is run. This could lead to several class loading issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 edited a comment on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment

2019-05-26 Thread GitBox
sunjincheng121 edited a comment on issue #8474: [FLINK-12409][python] Adds 
from_elements in TableEnvironment
URL: https://github.com/apache/flink/pull/8474#issuecomment-496041963
 
 
   The CI  throws `TypeError` for a python test case. I have been restarted it. 
And make sure all the test cases work well before the review. 
   @flinkbot approve-until architecture
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment

2019-05-26 Thread GitBox
sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds 
from_elements in TableEnvironment
URL: https://github.com/apache/flink/pull/8474#issuecomment-496041963
 
 
   The CI  throws `TypeError` for a python test case. I have been restarted it. 
And make sure all the test cases work well before the review. 
   @flinkbot approve-until
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph

2019-05-26 Thread Greg Hogan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848548#comment-16848548
 ] 

Greg Hogan commented on FLINK-5243:
---

[~jasleenk22], new contributions are always welcomed! Do you have a distributed 
algorithm in mind to implement for Bipartite Matching?

> Implement an example for BipartiteGraph
> ---
>
> Key: FLINK-5243
> URL: https://issues.apache.org/jira/browse/FLINK-5243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Graph Processing (Gelly)
>Reporter: Ivan Mushketyk
>Priority: Major
>  Labels: beginner
>
> Should implement example for BipartiteGraph in gelly-examples project 
> similarly to examples for Graph class.
> Depends on this: https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287611055
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,4 +272,18 @@ private InputGate createInputGate(
return gates[0];
}
}
+
+   private static ShuffleDeploymentDescriptor 
createLocalSdd(ResultPartitionID resultPartitionID, ResourceID location) {
 
 Review comment:
   we can also use `NettyShuffleDescriptorBuilder` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287609955
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -300,6 +302,9 @@
// -- Fields that are only relevant for archived execution graphs 

private String jsonPlan;
 
+   /** Shuffle master to register partitions for task deployment. */
+   private final ShuffleMaster shuffleMaster = 
DefaultShuffleMaster.getInstance();
 
 Review comment:
   At the moment the default implementation is hardcoded anyways. I suggest we 
consider it when we introduce configuration and proper creation of shuffle 
components as one of final steps.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12628) Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism

2019-05-26 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-12628:
---

 Summary: Check test failure if partition has no consumers in 
Execution.getPartitionMaxParallelism
 Key: FLINK-12628
 URL: https://issues.apache.org/jira/browse/FLINK-12628
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Andrey Zagrebin


Currently, we work around this case in Execution.getPartitionMaxParallelism 
because of tests:

// TODO consumers.isEmpty() only exists for test, currently there has to be 
exactly one consumer in real jobs!

though partition is supposed to have always at least one consumer atm.
We should check which test fails and consider fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287609890
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -566,6 +615,62 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
}
}
 
+   @VisibleForTesting
+   CompletableFuture 
registerProducedPartitions(TaskManagerLocation location) {
+   assertRunningInJobMasterMainThread();
+
+   return registerProducedPartitions(vertex, location, attemptId)
+   .thenApplyAsync(producedPartitionsCache -> {
+   producedPartitions = producedPartitionsCache;
+   return this;
+   }, 
vertex.getExecutionGraph().getJobMasterMainThreadExecutor());
+   }
+
+   @VisibleForTesting
+   static CompletableFuture>
+   registerProducedPartitions(
+   ExecutionVertex vertex,
+   TaskManagerLocation location,
+   ExecutionAttemptID attemptId) {
+
+   ProducerShuffleDescriptor producerShuffleDescriptor = 
ProducerShuffleDescriptor.create(
+   location, attemptId);
+
+   boolean lazyScheduling = 
vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment();
+
+   Collection partitions = 
vertex.getProducedPartitions().values();
+   
Collection> 
partitionRegistrations =
+   new ArrayList<>(partitions.size());
+
+   for (IntermediateResultPartition partition : partitions) {
+   PartitionShuffleDescriptor partitionShuffleDescriptor = 
PartitionShuffleDescriptor.from(
+   partition, 
getPartitionMaxParallelism(partition));
+   
partitionRegistrations.add(vertex.getExecutionGraph().getShuffleMaster()
+   
.registerPartitionWithProducer(partitionShuffleDescriptor, 
producerShuffleDescriptor)
+   .thenApply(shuffleDescriptor -> new 
ResultPartitionDeploymentDescriptor(
+   partitionShuffleDescriptor, 
shuffleDescriptor, lazyScheduling)));
+   }
+
+   return 
FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
+   Map producedPartitions =
+   new LinkedHashMap<>(partitions.size());
+   rpdds.forEach(rpdd -> 
producedPartitions.put(rpdd.getPartitionId(), rpdd));
+   return producedPartitions;
+   });
+   }
+
+   private static int 
getPartitionMaxParallelism(IntermediateResultPartition partition) {
+   // TODO consumers.isEmpty() only exists for test, currently 
there has to be exactly one consumer in real jobs!
 
 Review comment:
   This TODO existed before the PR, I suggest we tackle it separately.
   I created an issue for this 
https://issues.apache.org/jira/browse/FLINK-12628.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287607989
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ##
 @@ -206,27 +207,35 @@ public boolean processInput() throws Exception {
}
}
 
-   final BufferOrEvent bufferOrEvent = 
barrierHandler.getNextNonBlocked();
-   if (bufferOrEvent != null) {
-   if (bufferOrEvent.isBuffer()) {
-   currentChannel = 
bufferOrEvent.getChannelIndex();
-   currentRecordDeserializer = 
recordDeserializers[currentChannel];
-   
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-   }
-   else {
-   // Event received
-   final AbstractEvent event = 
bufferOrEvent.getEvent();
-   if (event.getClass() != 
EndOfPartitionEvent.class) {
-   throw new 
IOException("Unexpected event: " + event);
+   final Optional bufferOrEvent = 
barrierHandler.pollNext();
+   if (bufferOrEvent.isPresent()) {
+   processBufferOrEvent(bufferOrEvent.get());
+   } else {
+   if (!barrierHandler.isFinished()) {
+   barrierHandler.isAvailable().get();
 
 Review comment:
   Does this means we will waste `the time of waiting for new data` delaying 
execute mail for the this roundtrip?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-26 Thread GitBox
flinkbot commented on issue #8545: [FLINK-12520] Support to provide 
fully-qualified domain host name in TaskManagerMetricGroup
URL: https://github.com/apache/flink/pull/8545#issuecomment-496018362
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12520) Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12520:
---
Labels: pull-request-available  (was: )

> Support to provide fully-qualified domain host name in TaskManagerMetricGroup
> -
>
> Key: FLINK-12520
> URL: https://issues.apache.org/jira/browse/FLINK-12520
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>
> Inspired from Chinese 
> [user-mail|https://lists.apache.org/thread.html/e1774a42430815b689ea792103d002b1da734d6086682d34c044ef35@%3Cuser-zh.flink.apache.org%3E]
>  which complains that host name in metrics name could only show the first 
> part. However, their full host name is like "{{ambari.host12.yy}}" which 
> means the first part "{{ambari}}" cannot identify anything.
> We could support to let user record their full host name in the 
> TaskManagerMetricGroup so that to identify metrics from different hosts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka opened a new pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup

2019-05-26 Thread GitBox
Myasuka opened a new pull request #8545: [FLINK-12520] Support to provide 
fully-qualified domain host name in TaskManagerMetricGroup
URL: https://github.com/apache/flink/pull/8545
 
 
   ## What is the purpose of the change
   
   Inspired from [Chinese 
user-mail](https://lists.apache.org/thread.html/e1774a42430815b689ea792103d002b1da734d6086682d34c044ef35@%3Cuser-zh.flink.apache.org%3E)
 which complains that host name in metrics name could only show the first part. 
However, their full host name is like "ambari.host12.yy" which means the first 
part "ambari" cannot identify anything.
   
   With this PR, we could support to let user record their full host name in 
the `TaskManagerMetricGroup` so that to identify metrics from different hosts.
   
   ## Brief change log
   
 - Add new option `metrics.tm.full-hostname` in `MetricOptions` to indicate 
whether Flink should use fully qualified host name in task manager metrics.
 - Use above metric option when `instantiateTaskManagerMetricGroup` in 
`TaskManagerRunner`
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added test that `TaskManagerLocation#getHostName(InetAddress, boolean)` 
could return the host name as expected.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **docs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287607756
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -494,12 +486,12 @@ public void requestPartitions() throws IOException, 
InterruptedException {
// 

 
@Override
-   public Optional getNextBufferOrEvent() throws 
IOException, InterruptedException {
+   public Optional getNext() throws IOException, 
InterruptedException {
return getNextBufferOrEvent(true);
 
 Review comment:
   I notice here still have a choice for blocking or non-blocking, I check the 
code path, does this only used for the batch task?(just a question)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287607135
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 ##
 @@ -112,6 +113,8 @@
/** Flag to indicate whether we have drawn all available input. */
private boolean endOfStream;
 
+   private boolean isFinished;
 
 Review comment:
   No comment for this field, I notice this because it has a different behavior 
with `BufferTracker.java` about `finish` so I think it should be added some 
comment here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287607135
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 ##
 @@ -112,6 +113,8 @@
/** Flag to indicate whether we have drawn all available input. */
private boolean endOfStream;
 
+   private boolean isFinished;
 
 Review comment:
   No comment for this field, I think this should be comment because it has a 
different behavior with `BufferTracker.java` about `finish`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287606004
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 ##
 @@ -152,37 +155,37 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker 
bufferBlocker, long maxB
this.queuedBuffered = new ArrayDeque();
}
 
+   @Override
+   public CompletableFuture isAvailable() {
+   if (currentBuffered == null) {
+   return inputGate.isAvailable();
+   }
+   return AVAILABLE;
+   }
+
// 

//  Buffer and barrier handling
// 

 
@Override
-   public BufferOrEvent getNextNonBlocked() throws Exception {
+   public Optional pollNext() throws Exception {
 
 Review comment:
   A question here: why have to change from `BufferOrEvent` to 
`Optional`, to avoid null check? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking

2019-05-26 Thread GitBox
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make 
CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r287603157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/AsyncDataInput.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface defining couple of essential methods for asynchronous and non 
blocking data polling.
+ *
+ * For the most efficient usage, user of this class is suppose to call 
{@link #pollNext()}
 
 Review comment:
   typo `supposed`?  And also can we add a comment for `pollNext()` to declare 
the method should be implementation non blocking?  Or use a name like 
`pollNextNonBlocking` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss

2019-05-26 Thread yitzchak lieberman (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yitzchak lieberman closed FLINK-12574.
--
Resolution: Not A Bug

> using sink StreamingFileSink files are overwritten when resuming application 
> causing data loss
> --
>
> Key: FLINK-12574
> URL: https://issues.apache.org/jira/browse/FLINK-12574
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: yitzchak lieberman
>Priority: Critical
>
> when part files are saved to s3 bucket (with bucket assigner) with simple 
> names such as:
> part-0-0 and part-1-2
> restarting or resuming application causes checkpoint id to start from 0 and 
> old files will be replaced by new part files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss

2019-05-26 Thread yitzchak lieberman (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848360#comment-16848360
 ] 

yitzchak lieberman commented on FLINK-12574:


you are right, my bad.

closing the bug...

> using sink StreamingFileSink files are overwritten when resuming application 
> causing data loss
> --
>
> Key: FLINK-12574
> URL: https://issues.apache.org/jira/browse/FLINK-12574
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: yitzchak lieberman
>Priority: Critical
>
> when part files are saved to s3 bucket (with bucket assigner) with simple 
> names such as:
> part-0-0 and part-1-2
> restarting or resuming application causes checkpoint id to start from 0 and 
> old files will be replaced by new part files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)