[jira] [Commented] (FLINK-12502) Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover
[ https://issues.apache.org/jira/browse/FLINK-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848621#comment-16848621 ] leesf commented on FLINK-12502: --- Yes [~gjy] > Harden JobMasterTest#testRequestNextInputSplitWithDataSourceFailover > > > Key: FLINK-12502 > URL: https://issues.apache.org/jira/browse/FLINK-12502 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: leesf >Priority: Major > > The {{JobMasterTest#testRequestNextInputSplitWithDataSourceFailover}} relies > on how many files you have in your working directory. This assumption is > quite brittle. Instead we should explicitly instantiate an > {{InputSplitAssigner}} with a defined number of input splits. > Moreover, we should make the assertions more explicit: Input split > comparisons should not rely solely on the length of the input split data. > Maybe it is also not necessary to capture the full > {{TaskDeploymentDescriptor}} because we could already know the producer's and > consumer's {{JobVertexID}} when we create the {{JobGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8547: [FLINK-12267] Port SimpleSlotTest to new code base
flinkbot commented on issue #8547: [FLINK-12267] Port SimpleSlotTest to new code base URL: https://github.com/apache/flink/pull/8547#issuecomment-496081075 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12267) Port SimpleSlotTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12267: --- Labels: pull-request-available (was: ) > Port SimpleSlotTest to new code base > > > Key: FLINK-12267 > URL: https://issues.apache.org/jira/browse/FLINK-12267 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Reporter: leesf >Assignee: leesf >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > > Mainly get rid of {{Instance}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] leesf opened a new pull request #8547: [FLINK-12267] Port SimpleSlotTest to new code base
leesf opened a new pull request #8547: [FLINK-12267] Port SimpleSlotTest to new code base URL: https://github.com/apache/flink/pull/8547 ## What is the purpose of the change *Get rid of Instance in SimpleSlotTest* ## Brief change log Return a new SimpleSlot in _getSlot_ instead of using Instance. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) cc @GJL @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-496077999 Hi, @zhijiangW thanks for your detail explanation, I make a conclusion below Non-credit-based mode: - InPoolUsage Credit-based mode: - exclusiveInPoolUsage exclusiveBufferUsed/exclusiveBufferTotal(all initial credit) - floatingInPoolUsage floatingBufferUsed/floatingBufferTotal - InPoolUsage combine with exclusiveInPoolUsage and floatingInPoolUsage I think this looks better to distinguish the buffer usage between floating and exclusive in credit-based mode and also give a view of the overall perspective with inPoolUsage metric. WDYT @pnowojski @zhijiangW This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287640985 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -53,7 +53,21 @@ under the License. org.apache.flink - flink-table-api-java + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: Yes, we need blink planner for `BatchTableSink ` at the moment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-496069672 Thanks for the confirmation and good suggestions. @pnowojski > > In other words, if we see the floating buffers are used, that means the exclusive buffers should also be used. > > This is not true for even slight data skew Maybe my previous expression is not very clear. If floating buffers are used, I only mean the exclusive buffers for one/some `RemoteInputChannels` are also expected to be used eventually, but not indicate the exclusive buffers for all the channels are used. For example, if the producer's backlog is 1, we would always request another 1 floating buffer even though the 2 exclusive buffers for this channel are available atm. Because we want to feedback some overhead credits beforehand in order not to block the network transport after producing more backlog soon. So it is not strong consistent for exclusive buffers used in time, might be eventual consistent within our expectation. If the backlog is becoming 0 from 1, the previous requested floating buffer would also be released by this channel if the 2 exclusive buffers are still available. So from the aspect of one input channel, it would not occupy extra floating buffers if its available exclusive buffers are enough. I also agree with the above suggestions for distinguishing the total `inPoolUsage` and `floatingInPoolUsage`, or we could only retain `exclusiveInPoolUsage` and `floatingInPoolUsage` for credit-based mode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
JingsongLi commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287639913 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -53,7 +53,21 @@ under the License. org.apache.flink - flink-table-api-java + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: I think the reason is `BatchTableSink` with `BoundedStream` still in blink planner. We need unify `TableSource` and `TableSink` interface and move them to api. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)
Aitozi commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox) URL: https://github.com/apache/flink/pull/8523#discussion_r287637082 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -1358,4 +1358,19 @@ public void actionsUnavailable() throws InterruptedException { mailbox.putMail(actionUnavailableLetter); } } + + private class TimerInvocationContext implements SystemProcessingTimeService.ScheduledCallbackExecutionContext { + @Override + public void invoke(ProcessingTimeCallback callback, long timestamp) throws InterruptedException { + mailbox.putMail(() -> { + synchronized (getCheckpointLock()) { Review comment: Does here still have to require the checkpointLock? I think it has synchronized by mailbox. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287633012 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: ` 2 minutes` is a value that I give freely, and it has little basis, just considering that a slightly longer wait may be required in the case of high CPU load. In fact, the blob client has a retry mechanism, and we should set it a smaller. Do you have any recommended values about connection and socket timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287633012 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: ` 2 minutes` is a value that I give freely, and it has little basis, just considering that a slightly longer wait may be required in the case of high CPU load. In fact, the blob client has a retry mechanism, and we should set it a little smaller. Do you have any recommended values about connection and socket timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287638025 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException { config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - blobNonSslServer = new BlobServer(config, new VoidBlobStore()); + blobNonSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: I checked the code carefully, and `TestBlobServer` is not needed here. I will revert to `BlobServer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287637335 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -58,7 +58,7 @@ public static void startSSLServer() throws IOException { Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); - blobSslServer = new BlobServer(config, new VoidBlobStore()); + blobSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is not used here, an error will occur. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287635535 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException { config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - blobNonSslServer = new BlobServer(config, new VoidBlobStore()); + blobNonSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is not used here, an error will occur. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287635709 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ## @@ -487,4 +488,58 @@ private static void uploadJarFile( validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile); } } + + + /** +* Tests the socket operation timeout. +*/ + @Test + public void testSocketTimeout() { + Configuration clientConfig = getBlobClientConfig(); + int oldSoTimeout = clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT); + + clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50); + getBlobServer().setBlockingMillis(10_000); + + try { + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + + try (BlobClient client = new BlobClient(serverAddress, clientConfig)) { + client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB)); + + fail("Should throw an exception."); + } catch (Throwable t) { + assertEquals(java.net.SocketTimeoutException.class, ExceptionUtils.stripException(t, IOException.class).getClass()); Review comment: > We could create a new test class which contains all blob server tests which need to start a new blob server for each test. It's a good idea. But considering that only one test currently has this need, in order to reduce the maintenance cost of the code, I think we can create a new test class when more tests have this need in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287636685 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -53,7 +53,21 @@ under the License. org.apache.flink - flink-table-api-java + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: Firstly, this will not happen very soon. Secondly, i don't see a reason that a planner should be dependent for a connector. Could you explain the reason? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287635709 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ## @@ -487,4 +488,58 @@ private static void uploadJarFile( validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile); } } + + + /** +* Tests the socket operation timeout. +*/ + @Test + public void testSocketTimeout() { + Configuration clientConfig = getBlobClientConfig(); + int oldSoTimeout = clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT); + + clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50); + getBlobServer().setBlockingMillis(10_000); + + try { + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + + try (BlobClient client = new BlobClient(serverAddress, clientConfig)) { + client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB)); + + fail("Should throw an exception."); + } catch (Throwable t) { + assertEquals(java.net.SocketTimeoutException.class, ExceptionUtils.stripException(t, IOException.class).getClass()); Review comment: > We could create a new test class which contains all blob server tests which need to start a new blob server for each test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287635535 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException { config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - blobNonSslServer = new BlobServer(config, new VoidBlobStore()); + blobNonSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: `BlobClientSslTest` inherits `BlobClientTest`, and it will also execute the new test `testSocketTimeout` added in `BlobClientTest`. If `TestBlobServer` is not used here, an error will occur. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287635284 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java ## @@ -50,6 +54,7 @@ private static HiveConf getHiveConf() throws IOException { HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, ""); Review comment: it's not needed anymore, will remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287634504 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -53,7 +53,21 @@ under the License. org.apache.flink - flink-table-api-java + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: Yes we have. I thought the blink planner will eventually replace flink's. Is there any reason why we can't depend on it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287633517 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; + +/** + * Util class for accessing Hive tables. + */ +public class HiveTableUtil { + + private HiveTableUtil() { + } + + /** +* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}. +*/ + public static ObjectInspector getObjectInspector(TypeInformation flinkType) throws IOException { + return getObjectInspector(toHiveTypeInfo(flinkType)); + } + + // TODO: reuse Hive's TypeInfoUtils? + private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException { + switch (type.getCategory()) { + + case PRIMITIVE: + PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type; + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType); + + // TODO: support complex types + default: + throw new IOException("Unsupported Hive type category " + type.getCategory()); + } + } + + /** +* Converts a Flink {@link TypeInformation} to corresponding Hive {@link TypeInfo}. +*/ + public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) { Review comment: It's not. We need a `TypeInfo` here and `HiveTypeUtil.toHiveType()` gives the name of a hive type. But I guess this method can be moved to `HiveTypeUtil` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287633298 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -590,6 +578,17 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl return hiveTable; } + private static void setStorageFormat(StorageDescriptor sd, Map properties) { + // TODO: simply use text format for now + String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT; + StorageFormatDescriptor sfDescriptor = storageFormatFactory.get(storageFormatName); + checkArgument(sfDescriptor != null, "Unknown storage format " + storageFormatName); Review comment: It doesn't directly check input argument. It checks whether we can get a StorageFormatDescriptor based on the input argument. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
sunhaibotb commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287633012 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: ` 2 minutes` is a value that I give freely, and it has little basis, just considering that a slightly longer wait may be required in the case of high CPU load. In fact, the blob client has a retry mechanism. and we should set it a little smaller. Do you have any recommended values about connection and socket timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor
flinkbot commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor URL: https://github.com/apache/flink/pull/8546#issuecomment-496059964 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor
zjffdu commented on issue #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor URL: https://github.com/apache/flink/pull/8546#issuecomment-496059883 @tillrohrmann Could you help review it ? I notice you are original author of `JobExecutor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12621) Use MiniCluster instead of JobExecutor
[ https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12621: --- Labels: pull-request-available (was: ) > Use MiniCluster instead of JobExecutor > -- > > Key: FLINK-12621 > URL: https://issues.apache.org/jira/browse/FLINK-12621 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > JobExecutor is specifically used for local mode, I don't think we need to > introduce new class/interface for local mode, we should use the existing > MiniCluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate
KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate URL: https://github.com/apache/flink/pull/8527#discussion_r287631851 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/AggregateCalcMergeRule.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.AggregateProjectMergeRule; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate} + * on top of a {@link org.apache.calcite.rel.core.Calc} and if possible + * aggregate through the calc or removes the calc. + * + * This is only possible when no condition in calc and the grouping expressions and arguments to Review comment: If you can't merge condition, why not change the rule match pattern to `aggregate on project`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate
KurtYoung commented on a change in pull request #8527: [FLINK-12610] [table-planner-blink] Introduce planner rules about aggregate URL: https://github.com/apache/flink/pull/8527#discussion_r287631994 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkAggregateJoinTransposeRule.java ## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical; + +import org.apache.flink.table.plan.util.AggregateUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalSnapshot; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlSplittableAggFunction; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.Bug; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.Mappings; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +/** + * This rule is copied from Calcite's {@link org.apache.calcite.rel.rules.AggregateJoinTransposeRule}. + * Modification: + * - Do not match TemporalTableScan since it means that it is a dimension table scan currently. Review comment: Put this rule into CBO, and we can avoid to deal with such situation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu opened a new pull request #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor
zjffdu opened a new pull request #8546: [FLINK-12621]. Use MiniCluster instead of JobExecutor URL: https://github.com/apache/flink/pull/8546 ## What is the purpose of the change This is a refactoring PR which remove `JobExecutor` and use `MiniCluster` instead. `JobExecutor` is specifically used for local mode, I don't think we need to introduce such kind of new class/interface for local mode, we could just use the existing `MiniCluster`. ## Brief change log * Remove `JobExecutor` & `JobExecutorService` * Use `ClusterClient` to replace `JobExecutorService` ## Verifying this change Current CI is passed, no new code is introduced, just some refactoring. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables
KurtYoung commented on a change in pull request #8522: [FLINK-12572][hive]Implement TableSource and InputFormat to read Hive tables URL: https://github.com/apache/flink/pull/8522#discussion_r287631347 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -58,6 +58,28 @@ under the License. provided + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: Have you discussed about this? IMO the blink planner should not be dependent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
KurtYoung commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287631136 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -53,7 +53,21 @@ under the License. org.apache.flink - flink-table-api-java + flink-hadoop-compatibility_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} Review comment: Have you discussed about this? IMO the blink planner should not be dependent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287631199 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; + +/** + * Util class for accessing Hive tables. + */ +public class HiveTableUtil { + + private HiveTableUtil() { + } + + /** +* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}. +*/ + public static ObjectInspector getObjectInspector(TypeInformation flinkType) throws IOException { + return getObjectInspector(toHiveTypeInfo(flinkType)); + } + + // TODO: reuse Hive's TypeInfoUtils? + private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException { + switch (type.getCategory()) { + + case PRIMITIVE: + PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type; + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType); + + // TODO: support complex types + default: + throw new IOException("Unsupported Hive type category " + type.getCategory()); + } + } + + /** +* Converts a Flink {@link TypeInformation} to corresponding Hive {@link TypeInfo}. +*/ + public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) { + if (flinkType.equals(BasicTypeInfo.STRING_TYPE_INFO)) { Review comment: we can't because flink type is not enum This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell
zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell URL: https://github.com/apache/flink/pull/8533#discussion_r287630879 ## File path: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ## @@ -194,47 +200,28 @@ object FlinkShell { val configDirectory = new File(confDirPath) val configuration = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath) -val (repl, cluster) = try { - val (host, port, cluster) = fetchConnectionInfo(configuration, config) - val conf = cluster match { -case Some(Left(_)) => configuration -case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration -case None => configuration - } - +try { + val (host, port, shouldShutdownCluster, clusterClient) = fetchConnectionInfo(configuration, config) + val conf = clusterClient.getFlinkConfiguration println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") - val repl = bufferedReader match { -case Some(reader) => - val out = new StringWriter() - new FlinkILoop(host, port, conf, config.externalJars, reader, new JPrintWriter(out)) -case None => - new FlinkILoop(host, port, conf, config.externalJars) - } - - (repl, cluster) + new FlinkILoop(host, port, conf, config.externalJars, clusterClient, shouldShutdownCluster, in, +new JPrintWriter(out)) } catch { case e: IllegalArgumentException => println(s"Error: ${e.getMessage}") sys.exit() } + } + + def startShell(config: Config, in: Option[BufferedReader], out: JPrintWriter): Unit = { +println("Starting Flink Shell:") +val flinkILoop = createFlinkILoop(config, in, out) val settings = new Settings() settings.usejavacp.value = true settings.Yreplsync.value = true -try { - repl.process(settings) -} finally { - repl.closeInterpreter() - cluster match { -case Some(Left(miniCluster)) => miniCluster.close() -case Some(Right(yarnCluster)) => - yarnCluster.shutDownCluster() - yarnCluster.shutdown() -case _ => - } -} - +flinkILoop.process(settings) Review comment: It is not necessary to call `closeInterpreter` explicitly, as it will be called in method `process` https://github.com/scala/scala/blob/2.11.x/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L998 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell
zjffdu commented on a change in pull request #8533: [FLINK-12596][scala-shell] Unify all kinds of cluster via ClusterClient in FlinkShell URL: https://github.com/apache/flink/pull/8533#discussion_r287630879 ## File path: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ## @@ -194,47 +200,28 @@ object FlinkShell { val configDirectory = new File(confDirPath) val configuration = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath) -val (repl, cluster) = try { - val (host, port, cluster) = fetchConnectionInfo(configuration, config) - val conf = cluster match { -case Some(Left(_)) => configuration -case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration -case None => configuration - } - +try { + val (host, port, shouldShutdownCluster, clusterClient) = fetchConnectionInfo(configuration, config) + val conf = clusterClient.getFlinkConfiguration println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") - val repl = bufferedReader match { -case Some(reader) => - val out = new StringWriter() - new FlinkILoop(host, port, conf, config.externalJars, reader, new JPrintWriter(out)) -case None => - new FlinkILoop(host, port, conf, config.externalJars) - } - - (repl, cluster) + new FlinkILoop(host, port, conf, config.externalJars, clusterClient, shouldShutdownCluster, in, +new JPrintWriter(out)) } catch { case e: IllegalArgumentException => println(s"Error: ${e.getMessage}") sys.exit() } + } + + def startShell(config: Config, in: Option[BufferedReader], out: JPrintWriter): Unit = { +println("Starting Flink Shell:") +val flinkILoop = createFlinkILoop(config, in, out) val settings = new Settings() settings.usejavacp.value = true settings.Yreplsync.value = true -try { - repl.process(settings) -} finally { - repl.closeInterpreter() - cluster match { -case Some(Left(miniCluster)) => miniCluster.close() -case Some(Right(yarnCluster)) => - yarnCluster.shutDownCluster() - yarnCluster.shutdown() -case _ => - } -} - +flinkILoop.process(settings) Review comment: It is not necessary to call `closeInterpreter` implicitly, as it will be called in method `process` https://github.com/scala/scala/blob/2.11.x/src/repl/scala/tools/nsc/interpreter/ILoop.scala#L998 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API
zhijiangW commented on a change in pull request #8485: [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API URL: https://github.com/apache/flink/pull/8485#discussion_r287630954 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java ## @@ -72,4 +72,47 @@ public Counter getNumBuffersInLocalCounter() { public Counter getNumBuffersInRemoteCounter() { return numBuffersInRemote; } + + private static class MultiCounterWrapper implements Counter { + private final Counter[] counters; + + private MultiCounterWrapper(Counter ... counters) { + Preconditions.checkArgument(counters.length > 0); + this.counters = counters; + } + + @Override + public void inc() { + for (Counter c : counters) { + c.inc(); + } + } + + @Override + public void inc(long n) { + for (Counter c : counters) { + c.inc(n); + } + } + + @Override + public void dec() { + for (Counter c : counters) { + c.dec(); + } + } + + @Override + public void dec(long n) { + for (Counter c : counters) { + c.dec(n); + } + } + + @Override + public long getCount() { + // assume that the counters are not accessed directly elsewhere Review comment: I am not sure whether `getCounter` would be actually used. If used we should keep the return counter as previous structure. That means calling `new InputChannelMetrics(parentGroup, networkGroup) instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
lirui-apache commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287630791 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.TypeConverters; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; + +/** + * A table sink to write to Hive tables. + */ +public class HiveTableSink implements BatchTableSink { + + private final JobConf jobConf; + private final RowTypeInfo rowTypeInfo; + private final String dbName; + private final String tableName; + private final List partitionCols; + + public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String dbName, + String tableName, List partitionCols) { + this.jobConf = jobConf; + this.rowTypeInfo = rowTypeInfo; + this.dbName = dbName; + this.tableName = tableName; + this.partitionCols = partitionCols; + } + + @Override + public DataStreamSink emitBoundedStream(DataStream boundedStream, TableConfig tableConfig, ExecutionConfig executionConfig) { + // TODO: support partitioning + final boolean isPartitioned = false; + // TODO: support overwrite + final boolean overwrite = false; + HiveTablePartition hiveTablePartition; + HiveTableOutputFormat outputFormat; + IMetaStoreClient client = HMSClientFactory.create(new HiveConf(jobConf, HiveConf.class)); + try { + Table table = client.getTable(dbName, tableName); + StorageDescriptor sd = table.getSd(); + // here we use the sdLocation to store the output path of the job, which is always a staging dir + String sdLocation = sd.getLocation(); + if (isPartitioned) { + // TODO: implement this + } else { + sd.setLocation(toStagingDir(sdLocation, jobConf)); + hiveTablePartition = new HiveTablePartition(sd, null); + } + outputFormat = new HiveTableOutputFormat(jobConf, dbName, tableName, partitionCols, + rowTypeInfo, hiveTablePartition, MetaStoreUtils.getTableMetadata(table), overwrite); + } catch (TException e) { + throw new CatalogException("Failed to query Hive metastore", e); + }
[jira] [Created] (FLINK-12629) Add integration test for scala shell in yarn mode
Jeff Zhang created FLINK-12629: -- Summary: Add integration test for scala shell in yarn mode Key: FLINK-12629 URL: https://issues.apache.org/jira/browse/FLINK-12629 Project: Flink Issue Type: Improvement Components: Deployment / YARN, Scala Shell Affects Versions: 1.8.0 Reporter: Jeff Zhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12629) Add integration test for scala shell in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang reassigned FLINK-12629: -- Assignee: Jeff Zhang > Add integration test for scala shell in yarn mode > - > > Key: FLINK-12629 > URL: https://issues.apache.org/jira/browse/FLINK-12629 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN, Scala Shell >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12167) ClusterClient doesn't unset the context class loader after program run
[ https://issues.apache.org/jira/browse/FLINK-12167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abdul Qadeer resolved FLINK-12167. -- Resolution: Fixed Fix Version/s: 1.9.0 > ClusterClient doesn't unset the context class loader after program run > -- > > Key: FLINK-12167 > URL: https://issues.apache.org/jira/browse/FLINK-12167 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.8.0 >Reporter: Abdul Qadeer >Assignee: Abdul Qadeer >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > > {code:java} > public JobSubmissionResult run(PackagedProgram prog, int parallelism) > {code} > This method doesn't restore the thread's original class loader after program > is run. This could lead to several class loading issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 edited a comment on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment
sunjincheng121 edited a comment on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment URL: https://github.com/apache/flink/pull/8474#issuecomment-496041963 The CI throws `TypeError` for a python test case. I have been restarted it. And make sure all the test cases work well before the review. @flinkbot approve-until architecture This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment
sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment URL: https://github.com/apache/flink/pull/8474#issuecomment-496041963 The CI throws `TypeError` for a python test case. I have been restarted it. And make sure all the test cases work well before the review. @flinkbot approve-until This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848548#comment-16848548 ] Greg Hogan commented on FLINK-5243: --- [~jasleenk22], new contributions are always welcomed! Do you have a distributed algorithm in mind to implement for Bipartite Matching? > Implement an example for BipartiteGraph > --- > > Key: FLINK-5243 > URL: https://issues.apache.org/jira/browse/FLINK-5243 > Project: Flink > Issue Type: Sub-task > Components: Library / Graph Processing (Gelly) >Reporter: Ivan Mushketyk >Priority: Major > Labels: beginner > > Should implement example for BipartiteGraph in gelly-examples project > similarly to examples for Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287611055 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,4 +272,18 @@ private InputGate createInputGate( return gates[0]; } } + + private static ShuffleDeploymentDescriptor createLocalSdd(ResultPartitionID resultPartitionID, ResourceID location) { Review comment: we can also use `NettyShuffleDescriptorBuilder` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287609955 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -300,6 +302,9 @@ // -- Fields that are only relevant for archived execution graphs private String jsonPlan; + /** Shuffle master to register partitions for task deployment. */ + private final ShuffleMaster shuffleMaster = DefaultShuffleMaster.getInstance(); Review comment: At the moment the default implementation is hardcoded anyways. I suggest we consider it when we introduce configuration and proper creation of shuffle components as one of final steps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12628) Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism
Andrey Zagrebin created FLINK-12628: --- Summary: Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism Key: FLINK-12628 URL: https://issues.apache.org/jira/browse/FLINK-12628 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Andrey Zagrebin Currently, we work around this case in Execution.getPartitionMaxParallelism because of tests: // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! though partition is supposed to have always at least one consumer atm. We should check which test fails and consider fixing it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287609890 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -566,6 +615,62 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { } } + @VisibleForTesting + CompletableFuture registerProducedPartitions(TaskManagerLocation location) { + assertRunningInJobMasterMainThread(); + + return registerProducedPartitions(vertex, location, attemptId) + .thenApplyAsync(producedPartitionsCache -> { + producedPartitions = producedPartitionsCache; + return this; + }, vertex.getExecutionGraph().getJobMasterMainThreadExecutor()); + } + + @VisibleForTesting + static CompletableFuture> + registerProducedPartitions( + ExecutionVertex vertex, + TaskManagerLocation location, + ExecutionAttemptID attemptId) { + + ProducerShuffleDescriptor producerShuffleDescriptor = ProducerShuffleDescriptor.create( + location, attemptId); + + boolean lazyScheduling = vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment(); + + Collection partitions = vertex.getProducedPartitions().values(); + Collection> partitionRegistrations = + new ArrayList<>(partitions.size()); + + for (IntermediateResultPartition partition : partitions) { + PartitionShuffleDescriptor partitionShuffleDescriptor = PartitionShuffleDescriptor.from( + partition, getPartitionMaxParallelism(partition)); + partitionRegistrations.add(vertex.getExecutionGraph().getShuffleMaster() + .registerPartitionWithProducer(partitionShuffleDescriptor, producerShuffleDescriptor) + .thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor( + partitionShuffleDescriptor, shuffleDescriptor, lazyScheduling))); + } + + return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> { + Map producedPartitions = + new LinkedHashMap<>(partitions.size()); + rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd)); + return producedPartitions; + }); + } + + private static int getPartitionMaxParallelism(IntermediateResultPartition partition) { + // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! Review comment: This TODO existed before the PR, I suggest we tackle it separately. I created an issue for this https://issues.apache.org/jira/browse/FLINK-12628. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287607989 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ## @@ -206,27 +207,35 @@ public boolean processInput() throws Exception { } } - final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); - if (bufferOrEvent != null) { - if (bufferOrEvent.isBuffer()) { - currentChannel = bufferOrEvent.getChannelIndex(); - currentRecordDeserializer = recordDeserializers[currentChannel]; - currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } - else { - // Event received - final AbstractEvent event = bufferOrEvent.getEvent(); - if (event.getClass() != EndOfPartitionEvent.class) { - throw new IOException("Unexpected event: " + event); + final Optional bufferOrEvent = barrierHandler.pollNext(); + if (bufferOrEvent.isPresent()) { + processBufferOrEvent(bufferOrEvent.get()); + } else { + if (!barrierHandler.isFinished()) { + barrierHandler.isAvailable().get(); Review comment: Does this means we will waste `the time of waiting for new data` delaying execute mail for the this roundtrip? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup
flinkbot commented on issue #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup URL: https://github.com/apache/flink/pull/8545#issuecomment-496018362 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12520) Support to provide fully-qualified domain host name in TaskManagerMetricGroup
[ https://issues.apache.org/jira/browse/FLINK-12520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12520: --- Labels: pull-request-available (was: ) > Support to provide fully-qualified domain host name in TaskManagerMetricGroup > - > > Key: FLINK-12520 > URL: https://issues.apache.org/jira/browse/FLINK-12520 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > > Inspired from Chinese > [user-mail|https://lists.apache.org/thread.html/e1774a42430815b689ea792103d002b1da734d6086682d34c044ef35@%3Cuser-zh.flink.apache.org%3E] > which complains that host name in metrics name could only show the first > part. However, their full host name is like "{{ambari.host12.yy}}" which > means the first part "{{ambari}}" cannot identify anything. > We could support to let user record their full host name in the > TaskManagerMetricGroup so that to identify metrics from different hosts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka opened a new pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup
Myasuka opened a new pull request #8545: [FLINK-12520] Support to provide fully-qualified domain host name in TaskManagerMetricGroup URL: https://github.com/apache/flink/pull/8545 ## What is the purpose of the change Inspired from [Chinese user-mail](https://lists.apache.org/thread.html/e1774a42430815b689ea792103d002b1da734d6086682d34c044ef35@%3Cuser-zh.flink.apache.org%3E) which complains that host name in metrics name could only show the first part. However, their full host name is like "ambari.host12.yy" which means the first part "ambari" cannot identify anything. With this PR, we could support to let user record their full host name in the `TaskManagerMetricGroup` so that to identify metrics from different hosts. ## Brief change log - Add new option `metrics.tm.full-hostname` in `MetricOptions` to indicate whether Flink should use fully qualified host name in task manager metrics. - Use above metric option when `instantiateTaskManagerMetricGroup` in `TaskManagerRunner` ## Verifying this change This change added tests and can be verified as follows: - Added test that `TaskManagerLocation#getHostName(InetAddress, boolean)` could return the host name as expected. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **docs** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287607756 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -494,12 +486,12 @@ public void requestPartitions() throws IOException, InterruptedException { // @Override - public Optional getNextBufferOrEvent() throws IOException, InterruptedException { + public Optional getNext() throws IOException, InterruptedException { return getNextBufferOrEvent(true); Review comment: I notice here still have a choice for blocking or non-blocking, I check the code path, does this only used for the batch task?(just a question) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287607135 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ## @@ -112,6 +113,8 @@ /** Flag to indicate whether we have drawn all available input. */ private boolean endOfStream; + private boolean isFinished; Review comment: No comment for this field, I notice this because it has a different behavior with `BufferTracker.java` about `finish` so I think it should be added some comment here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287607135 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ## @@ -112,6 +113,8 @@ /** Flag to indicate whether we have drawn all available input. */ private boolean endOfStream; + private boolean isFinished; Review comment: No comment for this field, I think this should be comment because it has a different behavior with `BufferTracker.java` about `finish`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287606004 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ## @@ -152,37 +155,37 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxB this.queuedBuffered = new ArrayDeque(); } + @Override + public CompletableFuture isAvailable() { + if (currentBuffered == null) { + return inputGate.isAvailable(); + } + return AVAILABLE; + } + // // Buffer and barrier handling // @Override - public BufferOrEvent getNextNonBlocked() throws Exception { + public Optional pollNext() throws Exception { Review comment: A question here: why have to change from `BufferOrEvent` to `Optional`, to avoid null check? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking
Aitozi commented on a change in pull request #8467: [FLINK-12535][network] Make CheckpointBarrierHandler non-blocking URL: https://github.com/apache/flink/pull/8467#discussion_r287603157 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/AsyncDataInput.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** + * Interface defining couple of essential methods for asynchronous and non blocking data polling. + * + * For the most efficient usage, user of this class is suppose to call {@link #pollNext()} Review comment: typo `supposed`? And also can we add a comment for `pollNext()` to declare the method should be implementation non blocking? Or use a name like `pollNextNonBlocking` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss
[ https://issues.apache.org/jira/browse/FLINK-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yitzchak lieberman closed FLINK-12574. -- Resolution: Not A Bug > using sink StreamingFileSink files are overwritten when resuming application > causing data loss > -- > > Key: FLINK-12574 > URL: https://issues.apache.org/jira/browse/FLINK-12574 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: yitzchak lieberman >Priority: Critical > > when part files are saved to s3 bucket (with bucket assigner) with simple > names such as: > part-0-0 and part-1-2 > restarting or resuming application causes checkpoint id to start from 0 and > old files will be replaced by new part files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss
[ https://issues.apache.org/jira/browse/FLINK-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848360#comment-16848360 ] yitzchak lieberman commented on FLINK-12574: you are right, my bad. closing the bug... > using sink StreamingFileSink files are overwritten when resuming application > causing data loss > -- > > Key: FLINK-12574 > URL: https://issues.apache.org/jira/browse/FLINK-12574 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: yitzchak lieberman >Priority: Critical > > when part files are saved to s3 bucket (with bucket assigner) with simple > names such as: > part-0-0 and part-1-2 > restarting or resuming application causes checkpoint id to start from 0 and > old files will be replaced by new part files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)