[GitHub] [flink] HuangZhenQiu commented on issue #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
HuangZhenQiu commented on issue #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#issuecomment-493729416 @tillrohrmann As this branch haven't been synced with Master for a while, I will create another Pull Request. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12551) elasticsearch6 connector print log error
xiongkun created FLINK-12551: Summary: elasticsearch6 connector print log error Key: FLINK-12551 URL: https://issues.apache.org/jira/browse/FLINK-12551 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.6.3 Reporter: xiongkun when i use elasticsearch connector ,when my project is running,i find some data does not insert elasticsearch ,so i want to read log help me ,but the log does contain importance message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i find a error on write ERROR log. {code:java} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { BulkItemResponse itemResponse; Throwable failure; RestStatus restStatus; try { for (int i = 0; i < response.getItems().length; i++) { itemResponse = response.getItems()[i]; failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); restStatus = itemResponse.getFailure().getStatus(); if (restStatus == null) { failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer); } else { failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer); } } } } catch (Throwable t) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, t); } } if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } } {code} {code:java} @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); try { for (ActionRequest action : request.requests()) { failureHandler.onFailure(action, failure, -1, requestIndexer); } } catch (Throwable t) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, t); } if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r285362730 ## File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java ## @@ -55,6 +55,18 @@ " default, the port of the JobManager, because the same ActorSystem is used." + " Its not possible to use this configuration key to define port ranges."); + /** +* Defines the maximum number of workers (YARN / Mesos) failure can happen in a minute. Review comment: From our production experience, it would be better to let the job failure immediately to make engineer to involve into the issue. The failure rate here is to just distinguish whether it is a transient failure or real issue caused by external systems such as hdfs. Thus, I am not sure it is needed for wait for failure rate falls below the maximum. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog
bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8477#issuecomment-493695815 cc @xuefuz @lirui-apache This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12550: --- Labels: pull-request-available (was: ) > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get all available splits > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice, and will later today open a pull > request to fix it. > > Note: I'm not sure if I selected the correct component category. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".")
flinkbot commented on issue #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".") URL: https://github.com/apache/flink/pull/8478#issuecomment-493695413 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] felse opened a new pull request #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".")
felse opened a new pull request #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".") URL: https://github.com/apache/flink/pull/8478 ## What is the purpose of the change LocatableInputSplitAssigner does not assign input splits locally if the hostname contains a dot ("."), like in hostname.domain. This PR should fix this issue. ## Brief change log The hostnames of the input split locations are now prepared in the same way as the hostname of the TaskManager that is getting the next input split, before they are compared to each other. ## Verifying this change A unit test has been added (testLocalSplitAssignmentForHostWithDomainName in LocatableSplitAssignerTest). This test fails with all previous versions of LocatableInputSplitAssigner and passes with the change introcuced in this PR. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12550) hostnames with a dot never receive local input splits
Felix seibert created FLINK-12550: - Summary: hostnames with a dot never receive local input splits Key: FLINK-12550 URL: https://issues.apache.org/jira/browse/FLINK-12550 Project: Flink Issue Type: Bug Components: API / DataSet Affects Versions: 1.8.0 Reporter: Felix seibert LocatableInputSplitAssigner (in package api.common.io) fails to assign local input splits to hosts whose hostname contains a dot ("."). To reproduce add the following test to LocatableSplitAssignerTest and execute it. It will always fail. In my mind, this is contrary to the expected behaviour, which is that the host should obtain the one split that is stored on the very same machine. {code:java} @Test public void testLocalSplitAssignmentForHostWithDomainName() { try { String hostNameWithDot = "testhost.testdomain"; // load one split Set splits = new HashSet(); splits.add(new LocatableInputSplit(0, hostNameWithDot)); // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; ia.getNextInputSplit(hostNameWithDot, 0); assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(1, ia.getNumberOfLocalAssignments()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } {code} I also experienced this error in practice, and will later today open a pull request to fix it. Note: I'm not sure if I selected the correct component category. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843189#comment-16843189 ] Vahid Hashemian commented on FLINK-7883: We are also very interested in this enhancement that lets bounded consumption by {{FlinkKafkaConsumer}}. > Make savepoints atomic with respect to state and side effects > - > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Connectors / Kafka, Runtime / State > Backends >Affects Versions: 1.3.2, 1.4.0 >Reporter: Antoine Philippot >Priority: Major > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#issuecomment-493680887 @kisimple could you rebase on latest master and squash empty commits? it seems there has been a fix merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-493680455 Thanks for the reviews @zentol @zhijiangW , I addressed the comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285343015 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java ## @@ -40,35 +38,38 @@ public static ResultPartition createPartition(ResultPartitionType type) { false); } - public static ResultPartition createPartition(ResultPartitionType type, int numChannels) { - return createPartition(new NoOpResultPartitionConsumableNotifier(), type, numChannels, false); - } - public static ResultPartition createPartition( ResultPartitionConsumableNotifier notifier, ResultPartitionType type, boolean sendScheduleOrUpdateConsumersMessage) { - return createPartition(notifier, type, 1, sendScheduleOrUpdateConsumersMessage); + return createPartition(null, notifier, type, 1, sendScheduleOrUpdateConsumersMessage); } public static ResultPartition createPartition( Review comment: Well, certain short-cuts are still shorter if used in many places.. Not sure, I'll reduce it a bit more but atm I do not see it strictly being necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285342718 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -88,35 +86,62 @@ private final TaskEventPublisher taskEventPublisher; - private final IOManager ioManager; + private final ResultPartitionFactory resultPartitionFactory; + + private final SingleInputGateFactory singleInputGateFactory; private boolean isShutdown; - public NetworkEnvironment( - NetworkEnvironmentConfiguration config, - TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup, - IOManager ioManager) { - this.config = checkNotNull(config); + private NetworkEnvironment( + NetworkEnvironmentConfiguration config, + NetworkBufferPool networkBufferPool, + ConnectionManager connectionManager, + ResultPartitionManager resultPartitionManager, + TaskEventPublisher taskEventPublisher, + ResultPartitionFactory resultPartitionFactory, + SingleInputGateFactory singleInputGateFactory) { + + this.config = config; + this.networkBufferPool = networkBufferPool; + this.connectionManager = connectionManager; + this.resultPartitionManager = resultPartitionManager; + this.taskEventPublisher = taskEventPublisher; + this.resultPartitionFactory = resultPartitionFactory; + this.singleInputGateFactory = singleInputGateFactory; + this.isShutdown = false; + } - this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + public static NetworkEnvironment create( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup, + IOManager ioManager) { - NettyConfig nettyConfig = config.nettyConfig(); - if (nettyConfig != null) { - this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased()); - } else { - this.connectionManager = new LocalConnectionManager(); - } + NettyConfig nettyConfig = checkNotNull(config).nettyConfig(); + ConnectionManager connectionManager = nettyConfig != null ? + new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager(); - this.resultPartitionManager = new ResultPartitionManager(); - - this.taskEventPublisher = checkNotNull(taskEventPublisher); + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + config.numNetworkBuffers(), config.networkBufferSize(), config.networkBuffersPerChannel()); registerNetworkMetrics(metricGroup, networkBufferPool); - this.ioManager = checkNotNull(ioManager); - - isShutdown = false; + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( + resultPartitionManager, checkNotNull(ioManager), networkBufferPool, + config.networkBuffersPerChannel(), config.floatingNetworkBuffersPerGate()); Review comment: In general, I would prefer to keep classes/functions to have as least dependencies as possible if full `config` is not needed, it is easier to see the real dependencies and use in other places, like avoid mocking full config in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285342471 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java ## @@ -133,31 +103,64 @@ public static LocalInputChannel createLocalInputChannel( int initialBackoff, int maxBackoff) { - return new LocalInputChannel( - inputGate, - 0, - new ResultPartitionID(), - partitionManager, - new TaskEventDispatcher(), - initialBackoff, - maxBackoff, - newUnregisteredInputChannelMetrics()); + return InputChannelBuilder.newBuilder() + .setPartitionManager(partitionManager) + .setInitialBackoff(initialBackoff) + .setMaxBackoff(maxBackoff) + .buildLocal(inputGate); } public static RemoteInputChannel createRemoteInputChannel( SingleInputGate inputGate, int channelIndex, ConnectionManager connectionManager) { - return new RemoteInputChannel( - inputGate, - channelIndex, - new ResultPartitionID(), - STUB_CONNECTION_ID, - connectionManager, - 0, - 0, - newUnregisteredInputChannelMetrics()); + return InputChannelBuilder.newBuilder() + .setChannelIndex(channelIndex) + .setConnectionManager(connectionManager) + .buildRemote(inputGate); + } + + public static RemoteInputChannel createRemoteInputChannel( + SingleInputGate inputGate, + PartitionRequestClient client, + MemorySegmentProvider memorySegmentProvider) { + + return InputChannelBuilder.newBuilder() + .setConnectionManager(mockConnectionManagerWithPartitionRequestClient(client)) + .setMemorySegmentProvider(memorySegmentProvider) + .buildRemote(inputGate); + } + + public static ConnectionManager mockConnectionManagerWithPartitionRequestClient(PartitionRequestClient client) { Review comment: we could consider it, but this sounds like another refactoring :) for a later follow up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] mr-cloud commented on issue #8464: [hotfix] fix typos.
mr-cloud commented on issue #8464: [hotfix] fix typos. URL: https://github.com/apache/flink/pull/8464#issuecomment-493675884 This is my first trying to contribute. I'd like to know why my PR has not been accepted yet, and how 's it going now? What else do I need to do to finish this task successfully? Thanks for your help :-) @rmetzger This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285341747 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java ## @@ -48,7 +48,7 @@ public void testDestroyWhileBlockingRequest() throws Exception { LocalBufferPool localBufferPool = null; try { - networkBufferPool = new NetworkBufferPool(1, 4096); + networkBufferPool = new NetworkBufferPool(1, 4096, 1); Review comment: I did not want to explode number of constructors for tests, especially for one, not very important parameters. I think it does not make too long to add it in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285341674 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ## @@ -69,14 +71,19 @@ private int numTotalRequiredBuffers; + private final int numberOfRequestedBuffers; Review comment: I would name it `numberOfSegmentsToRequest` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285341508 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java ## @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws Exception { final PartitionRequestClient client = new PartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final int numExclusiveBuffers = 2; Review comment: I left it where it was used somewhere else, like in assertions, for readability. I guess readability was also the original intent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285341508 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java ## @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws Exception { final PartitionRequestClient client = new PartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final int numExclusiveBuffers = 2; Review comment: I left it where it was used somewhere else, like in assertions, for readability This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12519) Introduce planner rules about semi/anti join
[ https://issues.apache.org/jira/browse/FLINK-12519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12519. -- Resolution: Implemented Fix Version/s: 1.9.0 merged in 1.9.0: 4fa387164cea44f8e0bac1aadab11433c0f0ff2b > Introduce planner rules about semi/anti join > > > Key: FLINK-12519 > URL: https://issues.apache.org/jira/browse/FLINK-12519 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This issue aims to introduce planner rules about semi/anti join, rules > include: > 1. {{FlinkSemiAntiJoinFilterTransposeRule}} that pushes semi/anti join down > in a tree past a filter > 2. {{FlinkSemiAntiJoinJoinTransposeRule}} that pushes semi/anti join down in > a tree past a non semi/anti join > 3. {{FlinkSemiAntiJoinProjectTransposeRule}} that push semi/anti join down in > a tree past a project > 4. {{ProjectSemiAntiJoinTransposeRule}} that pushes a project down in a tree > past a semi/anti join > planner rules about non semi/anti join will be introduced in > [FLINK-12509|https://issues.apache.org/jira/browse/FLINK-12509]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung merged pull request #8450: [FLINK-12519] [table-planner-blink] Introduce planner rules about semi/anti join
KurtYoung merged pull request #8450: [FLINK-12519] [table-planner-blink] Introduce planner rules about semi/anti join URL: https://github.com/apache/flink/pull/8450 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285340688 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; + +import java.net.InetSocketAddress; + +/** + * Builder for various {@link InputChannel} types. + */ +public class InputChannelBuilder { + static final ConnectionID STUB_CONNECTION_ID = + new ConnectionID(new InetSocketAddress("localhost", 5000), 0); + + private int channelIndex = 0; + private ResultPartitionID partitionId = new ResultPartitionID(); + private ConnectionID connectionID = STUB_CONNECTION_ID; + private ResultPartitionManager partitionManager = new ResultPartitionManager(); + private TaskEventPublisher taskEventPublisher = new TaskEventDispatcher(); + private ConnectionManager connectionManager = new LocalConnectionManager(); + private int initialBackoff = 0; + private int maxBackoff = 0; + private InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics(); + + public static InputChannelBuilder newBuilder() { + return new InputChannelBuilder(); + } + + public InputChannelBuilder setChannelIndex(int channelIndex) { + this.channelIndex = channelIndex; + return this; + } + + public InputChannelBuilder setPartitionId(ResultPartitionID partitionId) { + this.partitionId = partitionId; + return this; + } + + public InputChannelBuilder setPartitionManager(ResultPartitionManager partitionManager) { + this.partitionManager = partitionManager; + return this; + } + + InputChannelBuilder setTaskEventPublisher(TaskEventPublisher taskEventPublisher) { + this.taskEventPublisher = taskEventPublisher; + return this; + } + + public InputChannelBuilder setConnectionManager(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + return this; + } + + public InputChannelBuilder setInitialBackoff(int initialBackoff) { + this.initialBackoff = initialBackoff; + return this; + } + + public InputChannelBuilder setMaxBackoff(int maxBackoff) { + this.maxBackoff = maxBackoff; + return this; + } + + public InputChannelBuilder setMetrics(InputChannelMetrics metrics) { + this.metrics = metrics; + return this; + } + + InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment network) { + this.partitionManager = network.getResultPartitionManager(); + this.connectionManager = network.getConnectionManager(); + this.initialBackoff = network.getConfiguration().partitionRequestInitialBackoff(); + this.maxBackoff = network.getConfiguration().partitionRequestMaxBackoff(); + return this; + } + + UnknownInputChannel buildUnknown(SingleInputGate inputGate) { + UnknownInputChannel channel = new UnknownInputChannel( +
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285340619 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.NoOpTaskActions; +import org.apache.flink.runtime.taskmanager.TaskActions; + +/** + * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance. + */ +public class SingleInputGateBuilder { + + private JobID jobId = new JobID(); + + private IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); Review comment: though builder is not supposed to be immutable in general, we can keep it final for now and change if needed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285340366 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -114,11 +114,12 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { .setNumTargetKeyGroups(parallelism) .setResultPartitionManager(partitionManager) .setSendScheduleOrUpdateConsumersMessage(true) + .setBufferPoolFactory(p -> + networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize)) .build(); // Create a buffer pool for this partition Review comment: well, this is still what actually happens atm, why is it out-dated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285339824 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ## @@ -88,7 +90,9 @@ public void testConsumptionWithRemoteChannels() throws Exception { final ConnectionManager connManager = createDummyConnectionManager(); final Source[] sources = new Source[numberOfChannels]; - final SingleInputGate gate = createSingleInputGate(numberOfChannels); Review comment: True, actually I do not mind short-hands. Though, I would remove one of short-hands with 3 parameters. I think builder is more readable in case of many parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12424) Supports dag (multiple-sinks query) optimization
[ https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-12424. -- Resolution: Implemented Fix Version/s: 1.9.0 merge in 1.9.0: e038a801a87f25b30a1b47ffe5710e5d9bd44c9b > Supports dag (multiple-sinks query) optimization > > > Key: FLINK-12424 > URL: https://issues.apache.org/jira/browse/FLINK-12424 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-05-07-13-33-02-793.png > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, Flink planner will optimize the plan in {{writeToSink}} method. If > there are more than one sink in a query, each sink-tree will be optimized > independent and the result execution plans are also completely independent. > Actually, there is a high probability of duplicate computing for a > multiple-sinks query. This issue aims to resolve the above problem. > The basic idea of the solution is as following: > 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, > just puts the plan into a collection. > 2. whole plan optimization and execution: a new {{execute}} method is added > in {{TableEnvironment}}, this method will trigger whole plan optimization and > execute the job. > The basic idea of dag (multiple-sinks query) optimization: > 1. decompose the dag into different block, the leaf block is the common > sub-plan > 2. optimize each block from leaf block to root block, each block only needs > to be optimized once > e.g. > {code:scala} > val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from > MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not > null) t2 where a1 = b2") > tableEnv.registerTable("TempTable", table) > val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70") > tableEnv.writeToSink(table1, Sink1) > val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70") > tableEnv.writeToSink(table2, Sink2) > {code} > !image-2019-05-07-13-33-02-793.png! > the above plan will be decomposed into 3 blocks, block1 is the input of > block2 and block3. block2 and block3 will be optimized after block1 has been > optimized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung merged pull request #8356: [FLINK-12424] [table-planner-blink] Supports dag (multiple-sinks query) optimization
KurtYoung merged pull request #8356: [FLINK-12424] [table-planner-blink] Supports dag (multiple-sinks query) optimization URL: https://github.com/apache/flink/pull/8356 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)
[ https://issues.apache.org/jira/browse/FLINK-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843135#comment-16843135 ] Truong Duc Kien commented on FLINK-6263: I think this is a duplicate of https://issues.apache.org/jira/browse/FLINK-3066 The workaround mentioned in that bug is {quote}Set the number of retries to a value above 0. By default Kafka sets it to 0 to avoid duplicates. {quote} > Leader error in Kafka producer on leader change (broker restart/failrue) > > > Key: FLINK-6263 > URL: https://issues.apache.org/jira/browse/FLINK-6263 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Major > > We have observed the following error in the Kafka producer > java.lang.Exception: Failed to send data to Kafka: This server is not the > leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished
lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished URL: https://github.com/apache/flink/pull/8265#issuecomment-493664892 hi, I given detail steps to reproduc this problem @GJL, [JIRA-12302](https://issues.apache.org/jira/browse/FLINK-12302) . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services