[GitHub] [flink] GJL commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic
GJL commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic URL: https://github.com/apache/flink/pull/8804#discussion_r296117701 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a virtual execution state of a {@link PipelinedRegion}. + * + * A pipelined region can be either finished or unfinished. It is finished iff. all its + * execution have reached the finished state. Review comment: _executions_ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8789: [FLINK-12890] Add partition lifecycle related Shuffle API
zhijiangW commented on a change in pull request #8789: [FLINK-12890] Add partition lifecycle related Shuffle API URL: https://github.com/apache/flink/pull/8789#discussion_r296115872 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java ## @@ -41,4 +44,17 @@ CompletableFuture registerPartitionWithProducer( PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor); + + /** +* Release any external resources occupied by the given partition. +* +* This call triggers release of any resources which are occupied by the given partition in the external systems +* outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions +* for which {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} returns {@code false}. +* The producer local resources are managed by {@link ShuffleDescriptor#hasLocalResources()} and +* {@link ShuffleEnvironment#releasePartitions(Collection)}. +* +* @param shuffleDescriptor shuffle descriptor of the result partition to release externally. +*/ + void releasePartitionExternally(T shuffleDescriptor); Review comment: Thanks for the explanation! If I understood correctly, the local release is for RPC JM->TM and external release is for RPC `ShuffleMaster`->`ShuffleEnvironment`. ATM we only have one internal shuffle implementation, then the `ShuffleEnvironment#releasePartitionsLocally` is actually used and `ShuffleMaster#releasePartitionsExternally` might never be used. But for extending external shuffle implementation future, I am not sure how these two release methods would be used then. My only concern is wondering this abstraction might bring confusing for future extending implementations, because we seem give some specific tags/limitations in general interface. We only need to provide the semantic/ability for the function, no need to reflect the backend detail implementations (`TaskManagerGateway`, `TM connection`). We already provide one simple implementation via JM/TM for release and also define the interface between `ShuffleMaster/ShuffleEnvironment` for release. The general `ShuffleMaster#releasePartition` is only for indicating that `ShuffleMaster` should release given partitions, internal/external shuffle implementations could both rely on it or not. E.g. the current `releaseOnConsumption` could also be implemented like this: when JM receives the notification of finished consumer task, it notifies `ShuffleMaster` to release corresponding producer's partitions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8789: [FLINK-12890] Add partition lifecycle related Shuffle API
zhijiangW commented on a change in pull request #8789: [FLINK-12890] Add partition lifecycle related Shuffle API URL: https://github.com/apache/flink/pull/8789#discussion_r296115872 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java ## @@ -41,4 +44,17 @@ CompletableFuture registerPartitionWithProducer( PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor); + + /** +* Release any external resources occupied by the given partition. +* +* This call triggers release of any resources which are occupied by the given partition in the external systems +* outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions +* for which {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} returns {@code false}. +* The producer local resources are managed by {@link ShuffleDescriptor#hasLocalResources()} and +* {@link ShuffleEnvironment#releasePartitions(Collection)}. +* +* @param shuffleDescriptor shuffle descriptor of the result partition to release externally. +*/ + void releasePartitionExternally(T shuffleDescriptor); Review comment: Thanks for the explanation! If I understood correctly, the local release is for RPC JM->TM and external release is for RPC `ShuffleMaster`->`ShuffleEnvironment`. ATM we only have one internal shuffle implementation, then the `ShuffleEnvironment#releasePartitionsLocally` is actually used but `ShuffleMaster#releasePartitionsExternally` might never be used. But for extending external shuffle implementation future, I am not sure how these two release methods would be used then. My only concern is wondering this abstraction might bring confusing for future extending implementations, because we seem give some specific tags/limitations in general interface. We only need to provide the semantic/ability for the function, no need to reflect the backend detail implementations (`TaskManagerGateway`, `TM connection`). We already provide one simple implementation via JM/TM for release and also define the interface between `ShuffleMaster/ShuffleEnvironment` for release. The general `ShuffleMaster#releasePartition` is only for indicating that `ShuffleMaster` should release given partitions, internal/external shuffle implementations could both rely on it or not. E.g. the current `releaseOnConsumption` could also be implemented like this: when JM receives the notification of finished consumer task, it notifies `ShuffleMaster` to release corresponding producer's partitions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator
flinkbot commented on issue #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator URL: https://github.com/apache/flink/pull/8821#issuecomment-504308095 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
[ https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869224#comment-16869224 ] Yun Tang commented on FLINK-9900: - Another instance which exceed 300 seconds [https://api.travis-ci.org/v3/job/548326670/log.txt] > Failed to testRestoreBehaviourWithFaultyStateHandles > (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > --- > > Key: FLINK-9900 > URL: https://issues.apache.org/jira/browse/FLINK-9900 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.5.1, 1.6.0, 1.9.0 >Reporter: zhangminglei >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.3 > > > https://api.travis-ci.org/v3/job/405843617/log.txt > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec > <<< FAILURE! - in > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase > > testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) > Time elapsed: 120.036 sec <<< ERROR! > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) > Results : > Tests in error: > > ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 > » TestTimedOut > Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 opened a new pull request #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator
liyafan82 opened a new pull request #8821: [FLINK-12922][Table SQL / Planner] Remove method parameter from OperatorCodeGenerator URL: https://github.com/apache/flink/pull/8821 ## What is the purpose of the change The TableConfig parameter of OperatorCodGenerator#generateOneInputStreamOperator should be removed, because: 1. This parameter is never actually used. 2. If it is ever used in the future, we can use ctx.getConfig to get the same object 3. The method signature should be consistent. The method generateTwoInputStreamOperator does not have this parameter. So this parameter should also be removed. ## Brief change log - Remove the parameter from the method - Remove the parameter from method callers. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12922) Remove method parameter from OperatorCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-12922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12922: --- Labels: pull-request-available (was: ) > Remove method parameter from OperatorCodeGenerator > -- > > Key: FLINK-12922 > URL: https://issues.apache.org/jira/browse/FLINK-12922 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Trivial > Labels: pull-request-available > > The TableConfig parameter of > OperatorCodGenerator#generateOneInputStreamOperator should be removed, > because: > # This parameter is never actually used. > # If it is ever used in the future, we can use ctx.getConfig to get the same > object > # The method signature should be consistent. The method > generateTwoInputStreamOperator does not have this parameter. So this > parameter should also be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on issue #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase
Myasuka commented on issue #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase URL: https://github.com/apache/flink/pull/8820#issuecomment-504306726 @flinkbot attention @tillrohrmann , due to you first introduce the retry logic in `AbstractOperatorRestoreTestBase`, please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
dianfu commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817#issuecomment-504306720 @WeiZhong94 Thanks a lot for the review. Updated accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12922) Remove method parameter from OperatorCodeGenerator
Liya Fan created FLINK-12922: Summary: Remove method parameter from OperatorCodeGenerator Key: FLINK-12922 URL: https://issues.apache.org/jira/browse/FLINK-12922 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Liya Fan Assignee: Liya Fan The TableConfig parameter of OperatorCodGenerator#generateOneInputStreamOperator should be removed, because: # This parameter is never actually used. # If it is ever used in the future, we can use ctx.getConfig to get the same object # The method signature should be consistent. The method generateTwoInputStreamOperator does not have this parameter. So this parameter should also be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
WeiZhong94 commented on a change in pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817#discussion_r296108714 ## File path: flink-python/pyflink/table/sinks.py ## @@ -45,7 +47,8 @@ class CsvTableSink(TableSink): :param write_mode: The write mode to specify whether existing files are overwritten or not. Review comment: Add new parameter documents here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
WeiZhong94 commented on a change in pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817#discussion_r296108555 ## File path: flink-python/pyflink/table/sinks.py ## @@ -45,7 +47,8 @@ class CsvTableSink(TableSink): :param write_mode: The write mode to specify whether existing files are overwritten or not. """ -def __init__(self, path, field_delimiter=',', num_files=1, write_mode=None): +def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1, + write_mode=None): # type: (str, str, int, int) -> None Review comment: Revise this type hint to consist with the parameter list? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
sunjincheng121 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#issuecomment-504303956 Thanks for the update@WeiZhong94! LGTM. +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296102664 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -168,6 +170,12 @@ private RexNode visitScalarFunc(UnresolvedCallExpression unresolvedCall) { return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, child); } else if (BuiltInFunctionDefinitions.MOD.equals(def)) { return relBuilder.call(FlinkSqlOperatorTable.MOD, child); + } else if (def instanceof ScalarFunctionDefinition) { + // TODO add getName for ScalarFunctionDefinition ??? Review comment: we can use `functionIdentifier` now, and fix it in another issue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase
flinkbot commented on issue #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase URL: https://github.com/apache/flink/pull/8820#issuecomment-504296833 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12916) KeyedComplexChainTest.testMigrationAndRestore failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12916: --- Labels: pull-request-available test-stability (was: test-stability) > KeyedComplexChainTest.testMigrationAndRestore failed on Travis > -- > > Key: FLINK-12916 > URL: https://issues.apache.org/jira/browse/FLINK-12916 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > > The test case {{KeyedComplexChainTest.testMigrationAndRestore}} failed on > Travis because a Task received the cancellation from one of its inputs > {code} > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:428) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:327) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.pollNext(BarrierBuffer.java:208) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:128) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.performDefaultAction(OneInputStreamTask.java:101) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:268) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:376) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:676) > ... 1 more > {code} > https://api.travis-ci.org/v3/job/548181384/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka opened a new pull request #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase
Myasuka opened a new pull request #8820: [FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase URL: https://github.com/apache/flink/pull/8820 ## What is the purpose of the change Add retry logical for checkpoint failure reason on cancellation barrier. ## Brief change log - Add retry logical for checkpoint failure reason on cancellation barrier in `AbstractOperatorRestoreTestBase`. ## Verifying this change This change is already covered by existing tests, such as `AbstractOperatorRestoreTestBase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296096283 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -168,6 +170,12 @@ private RexNode visitScalarFunc(UnresolvedCallExpression unresolvedCall) { return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, child); } else if (BuiltInFunctionDefinitions.MOD.equals(def)) { return relBuilder.call(FlinkSqlOperatorTable.MOD, child); + } else if (def instanceof ScalarFunctionDefinition) { + // TODO add getName for ScalarFunctionDefinition ??? Review comment: Now we just use `functionIdentifier` to be name. The detail reason is in https://github.com/apache/flink/pull/3330 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-504288046 @azagrebin thanks for the reviews! I have updated the codes for addressing the comments and rebasing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r296094858 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.apache.flink.util.FileUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +/** + * The manager used for creating/deleting file channels based on config temp dirs. + */ +public class FileChannelManager implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(FileChannelManager.class); + + /** The temporary directories for files. */ + private final File[] paths; Review comment: Considering not affecting the interface method which were already used for many places, then I take the defensive copy way here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296078676 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala ## @@ -137,3 +191,158 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Uni override def visitCall(call: RexCall): Unit = call.operands.foreach(operand => operand.accept(this)) } + +/** + * An RexVisitor to convert RexNode to Expression. + * + * @param inputNames The input names of the relation node + * @param functionCatalog The function catalog + */ +class RexNodeToExpressionConverter( +inputNames: Array[String], +functionCatalog: FunctionCatalog) + extends RexVisitor[Option[Expression]] { + + override def visitInputRef(inputRef: RexInputRef): Option[Expression] = { +Preconditions.checkArgument(inputRef.getIndex < inputNames.length) +Some(new FieldReferenceExpression( + inputNames(inputRef.getIndex), + TypeConversions.fromLogicalToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)), + 0, + inputRef.getIndex +)) + } + + override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[Expression] = +visitInputRef(rexTableInputRef) + + override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = { +throw new TableException("Bug: RexLocalRef should have been expanded") + } + + override def visitLiteral(literal: RexLiteral): Option[Expression] = { +// TODO support SqlTrimFunction.Flag +literal.getValue match { + case _: SqlTrimFunction.Flag => return None + case _ => // do nothing +} + +val literalType = FlinkTypeFactory.toLogicalType(literal.getType) + +val literalValue = literalType.getTypeRoot match { + + case DATE => +val v = literal.getValueAs(classOf[DateString]) +new Date(DateTimeUtils.dateStringToUnixDate(v.toString) * DateTimeUtils.MILLIS_PER_DAY) + + case TIME_WITHOUT_TIME_ZONE => +val v = literal.getValueAs(classOf[TimeString]) +new Time(DateTimeUtils.timeStringToUnixDate(v.toString(0)).longValue()) + + case TIMESTAMP_WITHOUT_TIME_ZONE => +val v = literal.getValueAs(classOf[TimestampString]) +new Timestamp(DateTimeUtils.timestampStringToUnixDate(v.toString(3))) + + case TINYINT => +// convert from BigDecimal to Byte +literal.getValueAs(classOf[java.lang.Byte]) + + case SMALLINT => +// convert from BigDecimal to Short +literal.getValueAs(classOf[java.lang.Short]) + + case INTEGER => +// convert from BigDecimal to Integer +literal.getValueAs(classOf[java.lang.Integer]) + + case BIGINT => +// convert from BigDecimal to Long +literal.getValueAs(classOf[java.lang.Long]) + + case FLOAT => +// convert from BigDecimal to Float +literal.getValueAs(classOf[java.lang.Float]) + + case DOUBLE => +// convert from BigDecimal to Double +literal.getValueAs(classOf[java.lang.Double]) + + case VARCHAR => +// convert from NlsString to String +literal.getValueAs(classOf[java.lang.String]) + + case BOOLEAN => +// convert to Boolean +literal.getValueAs(classOf[java.lang.Boolean]) + + case DECIMAL => +// convert to BigDecimal +literal.getValueAs(classOf[java.math.BigDecimal]) + + case _ => +literal.getValue +} + +Some(valueLiteral(literalValue, + LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(literalType))) + } + + override def visitCall(rexCall: RexCall): Option[Expression] = { +val operands = rexCall.getOperands.map( + operand => operand.accept(this).orNull +) + +// return null if we cannot translate all the operands of the call +if (operands.contains(null)) { + None +} else { + rexCall.getOperator match { +case SqlStdOperatorTable.OR => + Option(operands.reduceLeft { (l, r) => unresolvedCall(OR, l, r) }) +case SqlStdOperatorTable.AND => + Option(operands.reduceLeft { (l, r) => unresolvedCall(AND, l, r) }) +case SqlStdOperatorTable.CAST => + Option(unresolvedCall(CAST, operands.head, +typeLiteral(TypeConversions.fromLogicalToDataType( Review comment: It is better to invoke `LogicalTypeDataTypeConverter.fromLogicalTypeToDataType` before https://github.com/apache/flink/pull/8762 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apach
[GitHub] [flink] JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296078539 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala ## @@ -137,3 +191,158 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Uni override def visitCall(call: RexCall): Unit = call.operands.foreach(operand => operand.accept(this)) } + +/** + * An RexVisitor to convert RexNode to Expression. + * + * @param inputNames The input names of the relation node + * @param functionCatalog The function catalog + */ +class RexNodeToExpressionConverter( +inputNames: Array[String], +functionCatalog: FunctionCatalog) + extends RexVisitor[Option[Expression]] { + + override def visitInputRef(inputRef: RexInputRef): Option[Expression] = { +Preconditions.checkArgument(inputRef.getIndex < inputNames.length) +Some(new FieldReferenceExpression( + inputNames(inputRef.getIndex), + TypeConversions.fromLogicalToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)), + 0, + inputRef.getIndex +)) + } + + override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[Expression] = +visitInputRef(rexTableInputRef) + + override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = { +throw new TableException("Bug: RexLocalRef should have been expanded") + } + + override def visitLiteral(literal: RexLiteral): Option[Expression] = { +// TODO support SqlTrimFunction.Flag +literal.getValue match { + case _: SqlTrimFunction.Flag => return None + case _ => // do nothing +} + +val literalType = FlinkTypeFactory.toLogicalType(literal.getType) + +val literalValue = literalType.getTypeRoot match { + + case DATE => +val v = literal.getValueAs(classOf[DateString]) +new Date(DateTimeUtils.dateStringToUnixDate(v.toString) * DateTimeUtils.MILLIS_PER_DAY) + + case TIME_WITHOUT_TIME_ZONE => +val v = literal.getValueAs(classOf[TimeString]) +new Time(DateTimeUtils.timeStringToUnixDate(v.toString(0)).longValue()) + + case TIMESTAMP_WITHOUT_TIME_ZONE => +val v = literal.getValueAs(classOf[TimestampString]) +new Timestamp(DateTimeUtils.timestampStringToUnixDate(v.toString(3))) + + case TINYINT => +// convert from BigDecimal to Byte +literal.getValueAs(classOf[java.lang.Byte]) + + case SMALLINT => +// convert from BigDecimal to Short +literal.getValueAs(classOf[java.lang.Short]) + + case INTEGER => +// convert from BigDecimal to Integer +literal.getValueAs(classOf[java.lang.Integer]) + + case BIGINT => +// convert from BigDecimal to Long +literal.getValueAs(classOf[java.lang.Long]) + + case FLOAT => +// convert from BigDecimal to Float +literal.getValueAs(classOf[java.lang.Float]) + + case DOUBLE => +// convert from BigDecimal to Double +literal.getValueAs(classOf[java.lang.Double]) + + case VARCHAR => Review comment: Add CHAR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296091739 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/Vector.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; + +/** + * Base class of vector. + */ +public abstract class Vector implements Serializable { + private static final long serialVersionUID = 7690668399245109611L; + + public Vector() { + } + + public static DenseVector dense(String str) { + return DenseVector.deserialize(str); + } + + public static SparseVector sparse(String str) { + return SparseVector.deserialize(str); + } + + public static Tuple2 parseSparseTensor(String str) { + int numValues = 1; + for (int i = 0; i < str.length(); i++) { + if (str.charAt(i) == ',') { + numValues++; + } + } + int[] indices = new int[numValues]; + float[] values = new float[numValues]; + + int startPos = StringUtils.lastIndexOf(str, '$') + 1; + int endPos = -1; + int delimiterPos; + + for (int i = 0; i < numValues; i++) { + // extract the value string + endPos = StringUtils.indexOf(str, ',', startPos); + if (endPos == -1) { + endPos = str.length(); + } + delimiterPos = StringUtils.indexOf(str, ':', startPos); + if (delimiterPos == -1) { + throw new RuntimeException("invalid data: " + str); + } + indices[i] = Integer.valueOf(StringUtils.substring(str, startPos, delimiterPos)); + values[i] = Float.valueOf(StringUtils.substring(str, delimiterPos + 1, endPos)); + startPos = endPos + 1; + } + + return Tuple2.of(indices, values); + } + + public static boolean isSparse(String str) { + if (org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(str)) { + return true; + } + return StringUtils.indexOf(str, ':') != -1 || StringUtils.indexOf(str, "$") != -1; + } + + public static Vector deserialize(String str) { + Vector vec; + if (isSparse(str)) { + vec = Vector.sparse(str); + } else { + vec = Vector.dense(str); + } + return vec; + } + + public static Vector plus(Vector vec1, Vector vec2) { + if (vec1 instanceof DenseVector && vec2 instanceof DenseVector) { + return ((DenseVector) vec1).plus((DenseVector) vec2); + } else if (vec1 instanceof SparseVector && vec2 instanceof SparseVector) { + return ((SparseVector) vec1).plus((SparseVector) vec2); + } else if (vec1 instanceof SparseVector && vec2 instanceof DenseVector) { + return ((SparseVector) vec1).plus((DenseVector) vec2); + } else if (vec1 instanceof DenseVector && vec2 instanceof SparseVector) { + return ((SparseVector) vec2).plus((DenseVector) vec1); + } else { + throw new RuntimeException("Not implemented yet!"); + } + } + + public static Vector minus(Vector vec1, Vector vec2) { + if (vec1 instanceof DenseVector && vec2 instanceof DenseVector) { + return ((DenseVector) vec1).minus((DenseVector) vec2); + } else if (vec1 instanceo
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296091478 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; + + /** +* Array for internal storage of elements. +* +* The matrix data is stored in column major format internally. +*/ + double[] data; + +/* --- +* Constructors + * --- */ + + /** +* Construct an empty matrix. +*/ + public DenseMatrix() { + } + + /** +* Construct an m-by-n matrix of zeros. +* +* @param m Number of rows. +* @param n Number of colums. +*/ + public DenseMatrix(int m, int n) { + this.m = m; + this.n = n; + this.data = new double[m * n]; + } + + /** +* Construct a matrix from a 1-D array. The data in the array should organize +* in row major. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public DenseMatrix(int m, int n, double[] data) { + this(m, n, data, true); + } + + /** +* Construct a matrix from a 1-D array. The data in the array is organized +* in column major or in row major, which is specified by parameter 'inRowMajor' +* +* @param m Number of rows. +* @param n Number of cols. +* @param data One-dimensional array of doubles. +* @param inRowMajor Whether the matrix in 'data' is in row major format. +*/ + public DenseMatrix(int m, int n, double[] data, boolean inRowMajor) { + assert (data.length == m * n); + this.m = m; + this.n = n; + if (inRowMajor) { + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, data[i * n + j]); + } + } + } else { + this.data = data.clone(); + } + } + + /** +* Construct a matrix from a 2-D array. +* +* @param matA Two-dimensional array of doubles. +* @throws IllegalArgumentException All rows must have the same size +*/ + public DenseMatrix(double[][] matA) { + this.m = matA.length; + this.n = matA[0].length; + for (int i = 0; i < m; i++) { + if (matA[i].length != n) { + throw new IllegalArgumentException("All rows must have the same size."); + } + } + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, matA[i][j]); + } + } + } + + /** +* Construct a matrix with provided data buffer. +* This is for internal use only, so it is package private. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public static DenseMatrix fromDataBuffer(int m, int n, dou
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296091344 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; + + /** +* Array for internal storage of elements. +* +* The matrix data is stored in column major format internally. +*/ + double[] data; + +/* --- +* Constructors + * --- */ + + /** +* Construct an empty matrix. +*/ + public DenseMatrix() { + } + + /** +* Construct an m-by-n matrix of zeros. +* +* @param m Number of rows. +* @param n Number of colums. +*/ + public DenseMatrix(int m, int n) { + this.m = m; + this.n = n; + this.data = new double[m * n]; + } + + /** +* Construct a matrix from a 1-D array. The data in the array should organize +* in row major. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public DenseMatrix(int m, int n, double[] data) { + this(m, n, data, true); + } + + /** +* Construct a matrix from a 1-D array. The data in the array is organized +* in column major or in row major, which is specified by parameter 'inRowMajor' +* +* @param m Number of rows. +* @param n Number of cols. +* @param data One-dimensional array of doubles. +* @param inRowMajor Whether the matrix in 'data' is in row major format. +*/ + public DenseMatrix(int m, int n, double[] data, boolean inRowMajor) { + assert (data.length == m * n); + this.m = m; + this.n = n; + if (inRowMajor) { + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, data[i * n + j]); + } + } + } else { + this.data = data.clone(); + } + } + + /** +* Construct a matrix from a 2-D array. +* +* @param matA Two-dimensional array of doubles. +* @throws IllegalArgumentException All rows must have the same size +*/ + public DenseMatrix(double[][] matA) { + this.m = matA.length; + this.n = matA[0].length; + for (int i = 0; i < m; i++) { + if (matA[i].length != n) { + throw new IllegalArgumentException("All rows must have the same size."); + } + } + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, matA[i][j]); + } + } + } + + /** +* Construct a matrix with provided data buffer. +* This is for internal use only, so it is package private. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public static DenseMatrix fromDataBuffer(int m, int n, dou
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296091185 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; + + /** +* Array for internal storage of elements. +* +* The matrix data is stored in column major format internally. +*/ + double[] data; + +/* --- +* Constructors + * --- */ + + /** +* Construct an empty matrix. +*/ + public DenseMatrix() { + } + + /** +* Construct an m-by-n matrix of zeros. +* +* @param m Number of rows. +* @param n Number of colums. +*/ + public DenseMatrix(int m, int n) { + this.m = m; + this.n = n; + this.data = new double[m * n]; + } + + /** +* Construct a matrix from a 1-D array. The data in the array should organize +* in row major. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public DenseMatrix(int m, int n, double[] data) { + this(m, n, data, true); + } + + /** +* Construct a matrix from a 1-D array. The data in the array is organized +* in column major or in row major, which is specified by parameter 'inRowMajor' +* +* @param m Number of rows. +* @param n Number of cols. +* @param data One-dimensional array of doubles. +* @param inRowMajor Whether the matrix in 'data' is in row major format. +*/ + public DenseMatrix(int m, int n, double[] data, boolean inRowMajor) { + assert (data.length == m * n); + this.m = m; + this.n = n; + if (inRowMajor) { + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, data[i * n + j]); + } + } + } else { + this.data = data.clone(); + } + } + + /** +* Construct a matrix from a 2-D array. +* +* @param matA Two-dimensional array of doubles. +* @throws IllegalArgumentException All rows must have the same size +*/ + public DenseMatrix(double[][] matA) { + this.m = matA.length; + this.n = matA[0].length; + for (int i = 0; i < m; i++) { + if (matA[i].length != n) { + throw new IllegalArgumentException("All rows must have the same size."); + } + } + this.data = new double[m * n]; + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) { + this.set(i, j, matA[i][j]); + } + } + } + + /** +* Construct a matrix with provided data buffer. +* This is for internal use only, so it is package private. +* +* @param mNumber of rows. +* @param nNumber of cols. +* @param data One-dimensional array of doubles. +*/ + public static DenseMatrix fromDataBuffer(int m, int n, dou
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296090612 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; + + /** +* Array for internal storage of elements. +* +* The matrix data is stored in column major format internally. +*/ + double[] data; + +/* --- +* Constructors + * --- */ + + /** +* Construct an empty matrix. +*/ + public DenseMatrix() { + } + + /** +* Construct an m-by-n matrix of zeros. +* +* @param m Number of rows. +* @param n Number of colums. +*/ + public DenseMatrix(int m, int n) { + this.m = m; Review comment: OK, changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296090504 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; + + /** +* Array for internal storage of elements. +* +* The matrix data is stored in column major format internally. +*/ + double[] data; + +/* --- +* Constructors + * --- */ + + /** +* Construct an empty matrix. +*/ + public DenseMatrix() { Review comment: Yes, this is not necessary for the user, we have changed to package scope. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] synckey commented on issue #8799: [FLINK-11612][chinese-translation,Documentation] Translate the "Proje…
synckey commented on issue #8799: [FLINK-11612][chinese-translation,Documentation] Translate the "Proje… URL: https://github.com/apache/flink/pull/8799#issuecomment-504282422 I have no idea. Looks lit its travis-ci's problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296089776 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/BLAS.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +/** + * BLAS. Review comment: Thanks, added more description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations.
xuyang1706 commented on a change in pull request #8631: [FLINK-12745][ml] add sparse and dense vector class, and dense matrix class with basic operations. URL: https://github.com/apache/flink/pull/8631#discussion_r296089885 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/matrix/DenseMatrix.java ## @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.matrix; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Dense Matrix. + */ +public class DenseMatrix implements Serializable { + /* + Class variables + * */ + + /** +* Row and column dimensions. +*/ + int m, n; Review comment: declared in 2 lines. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType
wuchong commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType URL: https://github.com/apache/flink/pull/8730#discussion_r296087336 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala ## @@ -63,13 +64,14 @@ object TypeCoercion { */ def canSafelyCast( from: LogicalType, to: LogicalType): Boolean = (from.getTypeRoot, to.getTypeRoot) match { -case (_, VARCHAR) => true +case (_, VARCHAR | CHAR) => true case (_, DECIMAL) if isNumeric(from) => true -case (_, _) if numericWideningPrecedence.contains(from) && -numericWideningPrecedence.contains(to) => - if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) { +case (_, _) if numericWideningPrecedence.contains(from.copy(true)) && +numericWideningPrecedence.contains(to.copy(true)) => + if (numericWideningPrecedence.indexOf(from.copy(true)) < + numericWideningPrecedence.indexOf(to.copy(true))) { Review comment: What about add both NULLABLE and NOT NULL types to `numericWideningPrecedence`, so that we can avoid the verbose ` xx.copy(true)` in the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType
wuchong commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType URL: https://github.com/apache/flink/pull/8730#discussion_r296087906 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala ## @@ -52,9 +52,14 @@ object TypeCheckUtils { case _ => false } - def isVarchar(dataType: LogicalType): Boolean = dataType.getTypeRoot == VARCHAR + def isCharacterString(dataType: LogicalType): Boolean = + dataType.getTypeRoot.getFamilies.contains(LogicalTypeFamily.CHARACTER_STRING) Review comment: What about to move these mothods to common module? such as `LogicalTypeChecks` class ? So that both blink-planner and blink-runtime can use it. It is error-prune if we don't maintain is check in one place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
liyafan82 commented on a change in pull request #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive URL: https://github.com/apache/flink/pull/8511#discussion_r296088014 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -235,31 +235,32 @@ public void lockNode(final NodeId node) { * @throws Exception Thrown if the system cannot access the state. */ public void releaseNode(final NodeId node) throws Exception { - Lockable sharedBufferNode = sharedBuffer.getEntry(node); - if (sharedBufferNode != null) { - if (sharedBufferNode.release()) { - removeNode(node, sharedBufferNode.getElement()); - } else { - sharedBuffer.upsertEntry(node, sharedBufferNode); + // the stack used to detect all nodes that needs to be released. + Stack nodesToExamine = new Stack<>(); + nodesToExamine.push(node); + + while (!nodesToExamine.isEmpty()) { + NodeId curNode = nodesToExamine.pop(); + Lockable curBufferNode = sharedBuffer.getEntry(curNode); + + if (curBufferNode == null) { + break; Review comment: @dawidwys Good question. Thanks. There can be duplicated attempts to remove a node. So if we do not check whether currBufferNode is null, an NPE will be thrown. As an example, please take a look at CEPITCase#testFlatSelectSerializationWithAnonymousClass. There are two attempts to release the node from NFA#doProcess method. When a node is released (and set to null), the decedent nodes have already been released too, so there is no need to continue examining nodes in nodesToExamine. Please note that such logic is not designed by me. The original code worked the same way. I just refactor the code so that the logic is identical to the original code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296084274 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -168,6 +170,12 @@ private RexNode visitScalarFunc(UnresolvedCallExpression unresolvedCall) { return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, child); } else if (BuiltInFunctionDefinitions.MOD.equals(def)) { return relBuilder.call(FlinkSqlOperatorTable.MOD, child); + } else if (def instanceof ScalarFunctionDefinition) { + // TODO add getName for ScalarFunctionDefinition ??? Review comment: thanks for the correction. the first parameter is `function name (used by SQL parser)`, so `scalaFunc.functionIdentifier()` is not correct i think. `List child = convertCallChildren(call);` is defined at line 122 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
godfreyhe commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296084274 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -168,6 +170,12 @@ private RexNode visitScalarFunc(UnresolvedCallExpression unresolvedCall) { return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, child); } else if (BuiltInFunctionDefinitions.MOD.equals(def)) { return relBuilder.call(FlinkSqlOperatorTable.MOD, child); + } else if (def instanceof ScalarFunctionDefinition) { + // TODO add getName for ScalarFunctionDefinition ??? Review comment: thanks for the correction. `List child = convertCallChildren(call);` is defined at line 122 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869126#comment-16869126 ] Liya Fan edited comment on FLINK-12886 at 6/21/19 3:27 AM: --- [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, the maximum memory requirement during the steps is 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simply allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Note that the maximum memory requirement during the process is only 80 MB, so our 100 MB memory space will serve the request well. In addition, there is no need for the memory copy, so the performance overhead no longer exists. What do you think? was (Author: fan_li_ya): [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, this will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simple allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Our 100 MB memory space will serve the requests well. In addition, there is no need for the memory copy, so the performance overhead is removed. What do you think? > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3.
[jira] [Commented] (FLINK-12886) Support container memory segment
[ https://issues.apache.org/jira/browse/FLINK-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869126#comment-16869126 ] Liya Fan commented on FLINK-12886: -- [~ykt836], [~lzljs3620320] Another advantage I can think of is in-place expanding. In some scenarios, we need to expand the memory capacity, for example: # When we write a big string to a BinaryRowWriter, and the memory segment of the writer is not sufficient to hold the new data. # When we do rehash in a hash table The current steps of expanding memory space is as follows: # Create a new memory space with a large size # Copy the data form the old memory space to the new memory space. The problems with this approach: # The memory copy incurs performance overhead # The memory requirement is too big. Let me illustrate problem 2 with a concrete example: suppose we have totally 100 MB memory, and currently we are using a 40 MB memory space. We want to expand the memory space from 40 MB to 80 MB. According to the above steps, this will require 40 + 80 = 120 MB memory, which is infeasible, because we only have 100 MB memory. So although the memory space (100 MB) is sufficient to fulfill our request (80 MB), the expand must fail. This problem can be resolved by ContainerMemorySegment: we simple allocate another 40 MB memory and append them to the end of the existing memory segments held by the ContainerMemorySegment. Our 100 MB memory space will serve the requests well. In addition, there is no need for the memory copy, so the performance overhead is removed. What do you think? > Support container memory segment > > > Key: FLINK-12886 > URL: https://issues.apache.org/jira/browse/FLINK-12886 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2019-06-18-17-59-42-136.png > > Time Spent: 10m > Remaining Estimate: 0h > > We observe that in many scenarios, the operations/algorithms are based on an > array of MemorySegment. These memory segments form a large, combined, and > continuous memory space. > For example, suppose we have an array of n memory segments. Memory addresses > from 0 to segment_size - 1 are served by the first memory segment; memory > addresses from segment_size to 2 * segment_size - 1 are served by the second > memory segment, and so on. > Specific algorithms decide the actual MemorySegment to serve the operation > requests. For some rare cases, two or more memory segments serve the > requests. There are many operations based on such a paradigm, for example, > {{BinaryString#matchAt}}, {{SegmentsUtil#copyToBytes}}, > {{LongHashPartition#MatchIterator#get}}, etc. > The problem is that, for memory segment array based operations, large amounts > of code is devoted to > 1. Computing the memory segment index & offset within the memory segment. > 2. Processing boundary cases. For example, to write an integer, there are > only 2 bytes left in the first memory segment, and the remaining 2 bytes must > be written to the next memory segment. > 3. Differentiate processing for short/long data. For example, when copying > memory data to a byte array. Different methods are implemented for cases when > 1) the data fits in a single segment; 2) the data spans multiple segments. > Therefore, there are much duplicated code to achieve above purposes. What is > worse, this paradigm significantly increases the amount of code, making the > code more difficult to read and maintain. Furthermore, it easily gives rise > to bugs which difficult to find and debug. > To address these problems, we propose a new type of memory segment: > {{ContainerMemorySegment}}. It is based on an array of underlying memory > segments with the same size. It extends from the {{MemorySegment}} base > class, so it provides all the functionalities provided by {{MemorySegment}}. > In addition, it hides all the details for dealing with specific memory > segments, and acts as if it were a big continuous memory region. > A prototype implementation is given below: > !image-2019-06-18-17-59-42-136.png|thumbnail! > With this new type of memory segment, many operations/algorithms can be > greatly simplified, without affecting performance. This is because, > 1. Many checks, boundary processing are already there. We just move them to > the new class. > 2. We optimize the implementation of the new class, so the special > optimizations (e.g. optimizations for short data) are still preserved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#issuecomment-504273756 @sunjincheng121 Thanks for your comments! I have reordered the packages in `index.rst` and `pyflink.rst`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8819: [FLINK-12911][table-api] Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge
flinkbot commented on issue #8819: [FLINK-12911][table-api] Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge URL: https://github.com/apache/flink/pull/8819#issuecomment-504273318 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
godfreyhe commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink URL: https://github.com/apache/flink/pull/8695#issuecomment-504273271 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong opened a new pull request #8819: [FLINK-12911][table-api] Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge
wuchong opened a new pull request #8819: [FLINK-12911][table-api] Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge URL: https://github.com/apache/flink/pull/8819 ## What is the purpose of the change This is a simple issue to port these interface from scala to java and move to table-api-java-bridge module. ## Brief change log Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to scala and move to `table-api-java-bridge` module. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12911) Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge
[ https://issues.apache.org/jira/browse/FLINK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-12911: Description: This is a simple issue to port these interface from scala to java and move to table-api-java-bridge module. (was: This is a simple issue to port these interface from scala to java and move to table-api-java module.) > Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to > flink-api-java-bridge > -- > > Key: FLINK-12911 > URL: https://issues.apache.org/jira/browse/FLINK-12911 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > This is a simple issue to port these interface from scala to java and move to > table-api-java-bridge module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12911) Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to flink-api-java-bridge
[ https://issues.apache.org/jira/browse/FLINK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12911: --- Labels: pull-request-available (was: ) > Port AppendStreamTableSink, UpsertStreamTableSink, RetractStreamTableSink to > flink-api-java-bridge > -- > > Key: FLINK-12911 > URL: https://issues.apache.org/jira/browse/FLINK-12911 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > > This is a simple issue to port these interface from scala to java and move to > table-api-java-bridge module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12920) Drop support for register_table_sink with field_names and field_types parameters
[ https://issues.apache.org/jira/browse/FLINK-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12920: --- Labels: pull-request-available (was: ) > Drop support for register_table_sink with field_names and field_types > parameters > > > Key: FLINK-12920 > URL: https://issues.apache.org/jira/browse/FLINK-12920 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The following registerTableSink API in TableEnvironment is deprecated: > {code:java} > @Deprecated > void registerTableSink(String name, String[] fieldNames, TypeInformation[] > fieldTypes, TableSink tableSink); > {code} > We can drop the support of it in Python Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12916) KeyedComplexChainTest.testMigrationAndRestore failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-12916: Assignee: Yun Tang > KeyedComplexChainTest.testMigrationAndRestore failed on Travis > -- > > Key: FLINK-12916 > URL: https://issues.apache.org/jira/browse/FLINK-12916 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Yun Tang >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > The test case {{KeyedComplexChainTest.testMigrationAndRestore}} failed on > Travis because a Task received the cancellation from one of its inputs > {code} > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:428) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:327) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.pollNext(BarrierBuffer.java:208) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:128) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.performDefaultAction(OneInputStreamTask.java:101) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:268) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:376) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:676) > ... 1 more > {code} > https://api.travis-ci.org/v3/job/548181384/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8818: [FLINK-12794][table] Support ACC mode for non-window streaming FlatAggregate on Table API
flinkbot commented on issue #8818: [FLINK-12794][table] Support ACC mode for non-window streaming FlatAggregate on Table API URL: https://github.com/apache/flink/pull/8818#issuecomment-504271546 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 opened a new pull request #8818: [FLINK-12794][table] Support ACC mode for non-window streaming FlatAggregate on Table API
hequn8128 opened a new pull request #8818: [FLINK-12794][table] Support ACC mode for non-window streaming FlatAggregate on Table API URL: https://github.com/apache/flink/pull/8818 ## What is the purpose of the change [FLINK-12401](https://issues.apache.org/jira/browse/FLINK-12401) has addressed incremental emit under AccRetract mode. This pull request supports incremental emit under Acc mode. The detail can be found in [Flip-29](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739) ## Brief change log - API part. Add withKeys method to table api. Similar to the `as` which is used to descript the result table schema, the `withKeys` function is used to specify the key fields of the result table of the TableAggregateFunction. - Add internal keyIndexes member in `TableAggregateFunction` to store key information. - Support key extraction for table aggregate in `UniqueKeyExtractor`. With the key information, we can upsert the result of table aggregate into the downstream under some circumstance. - Adapt codegen logic to choose `emitUpdateWithoutRetract` when the table aggregate operator under Acc mode. - Add document about the `emitUpdateWithoutRetract` method. Note that this PR has not provided detail examples about `emitUpdateWithoutRetract` and `emitUpdateWithRetract` method, because we are going to add these examples in the following up FLINK-11147 ## Verifying this change This change added tests and can be verified as follows: - Add integration tests for upsert into sink. - Add java string Expression test. `TableAggregateStringExpressionTest.testWithKeysAfterAlias`, `TableAggregateStringExpressionTest.testWithKeysWithoutAlias`. - Add validation tests. `TableAggregateValidationTest.testInvalidNamesInWithKeys` - Add harness tests. `TableAggregateHarnessTest.testEmitUpdateWithoutRetract` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs and JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
flinkbot commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817#issuecomment-504271371 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12794) Support ACC mode for non-window streaming FlatAggregate on Table API
[ https://issues.apache.org/jira/browse/FLINK-12794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12794: --- Labels: pull-request-available (was: ) > Support ACC mode for non-window streaming FlatAggregate on Table API > > > Key: FLINK-12794 > URL: https://issues.apache.org/jira/browse/FLINK-12794 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Since the execution of the Stream operator has two modes, `ACC` and > `ACCRetract`, users can achieve better performance by implementing special > interfaces for streaming. The table below is a quick summary. > | |emitValue|emitUpdateWithRetract|emitUpdateWithoutRetract| > |ACC|Y|N|Y| > |ACCRetract|Y|Y|N| > * emitValue - for batch and streaming. > * eimitUpdateWithRetract - only for streaming in ACC mode(need key > definition on TableAggregateFunction, [under > discussion|https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing]). > * emitUpdateWithoutRetract - only for streaming in ACCRetract mode. > In this Jira we will add the ACC mode support for non-window streaming > FlatAggregate. the detail can be found in > [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType
JingsongLi commented on a change in pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType URL: https://github.com/apache/flink/pull/8730#discussion_r296080979 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ## @@ -379,14 +380,25 @@ object FlinkTypeFactory { case BIGINT => new BigIntType() case FLOAT => new FloatType() case DOUBLE => new DoubleType() - case VARCHAR | CHAR => -// TODO we use VarCharType to support sql CHAR, VarCharType don't support 0 length -new VarCharType( - if (relDataType.getPrecision == 0) VarCharType.MAX_LENGTH else relDataType.getPrecision) - case VARBINARY | BINARY => -// TODO we use VarBinaryType to support sql BINARY, VarBinaryType don't support 0 length -new VarBinaryType( - if (relDataType.getPrecision == 0) VarBinaryType.MAX_LENGTH else relDataType.getPrecision) + case CHAR => +if (relDataType.getPrecision == 0) { + CharType.ofEmptyLiteral +} else { + new CharType(relDataType.getPrecision) +} + case VARCHAR => +// Calcite will return a varchar RelDataType with precision 0, for example, +// SUBSTR('', 2, -1), Calcite infers that the return value must not exist +// (in this case, null will be returned when executed in Blink runner), +// so it infers VARCHAR(0). But our `VarcharType` not allow precision 0. +new VarCharType(Math.max(1, relDataType.getPrecision)) Review comment: After https://github.com/apache/flink/pull/8730 , it is ready now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu opened a new pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
dianfu opened a new pull request #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817 ## What is the purpose of the change *The following API registerTableSink in TableEnvironment has been deprecated at Java side:* `@Deprecated void registerTableSink(String name, String[] fieldNames, TypeInformation[] fieldTypes, TableSink tableSink);` *We should drop support of it in Python Table API.* ## Brief change log - *Remove the parameters field_names and field_types in register_table_sink* - *Update TestTableSink to make it accept field_names and field_types as parameters and configure the Java TableSink during constructing the Java TableSink* ## Verifying this change This change is already covered by existing tests, such as *StreamTableEnvironmentTests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
lirui-apache commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink URL: https://github.com/apache/flink/pull/8695#issuecomment-504270519 Thanks @wuchong for updating. LGTM now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869118#comment-16869118 ] Xiaogang Shi commented on FLINK-12887: -- [~till.rohrmann] I am also very curious on the method we implement delayed {{runAsync}} operations. Now we first send the {{runAsync}} message into the actor and then schedule the operation with Akka dispatcher. There are two questions in the implementation: 1. It seems unnecessary to send the {{runAsync}} message to the actor at first. Can we simply schedule the message with Akka dispatcher? 2. The token is enveloped again by the actor. Rarely but possibily, the token at submit time is different from the one at envelope time: t1. A rpc endpoint submit a {{runAsync}} operation t2. The rpc endpoint loses its leadership t3. The rpc endpoint grants its leadership, creating a new fencing token t4. The {{runAsync}} operation is executed by the actor. It's enveloped with the new fencing token, and is scheduled by the Akka dispatcher. In such cases, an operation in previous session will be executed. That may lead to unexpected results. > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xintongsong commented on issue #8740: [FLINK-12763][runtime] Fail job immediately if tasks’ resource needs can not be satisfied.
xintongsong commented on issue #8740: [FLINK-12763][runtime] Fail job immediately if tasks’ resource needs can not be satisfied. URL: https://github.com/apache/flink/pull/8740#issuecomment-504269702 @flinkbot attention @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource profiles for task slots
xintongsong commented on issue #8704: [FLINK-12812][runtime] Set resource profiles for task slots URL: https://github.com/apache/flink/pull/8704#issuecomment-504269649 @flinkbot attention @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
sunjincheng121 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r296075037 ## File path: flink-python/docs/index.rst ## @@ -25,6 +25,9 @@ Welcome to Flink Python API Docs! pyflink pyflink.table + pyflink.dataset + pyflink.datastream + pyflink.common Review comment: Can we move the `pyflink.common` at the first position? ![image](https://user-images.githubusercontent.com/22488084/59893008-ed67cf80-940d-11e9-864e-f5da7f8abcc2.png) That is: - pyflink.common - pyflink.table -pyflink.dataset -pyflink.datastream What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
wuchong commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink URL: https://github.com/apache/flink/pull/8695#issuecomment-504268118 After discussed with @lirui-apache offline, I think it make sense to let sink know what's the static partition part. Then the sink will have some optimization when it's not a dynamic partition inserting. So I add back the `setStaticPartition`. Could you have a look again @lirui-apache ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource
JingsongLi commented on a change in pull request #8782: [FLINK-12888] [table-planner-blink] Introduce planner rule to push filter into TableSource URL: https://github.com/apache/flink/pull/8782#discussion_r296078197 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java ## @@ -168,6 +170,12 @@ private RexNode visitScalarFunc(UnresolvedCallExpression unresolvedCall) { return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, child); } else if (BuiltInFunctionDefinitions.MOD.equals(def)) { return relBuilder.call(FlinkSqlOperatorTable.MOD, child); + } else if (def instanceof ScalarFunctionDefinition) { + // TODO add getName for ScalarFunctionDefinition ??? Review comment: It looks wrong code, should be: ``` ScalarFunction scalaFunc = ((ScalarFunctionDefinition) func).getScalarFunction(); List child = convertCallChildren(call); SqlFunction sqlFunction = UserDefinedFunctionUtils.createScalarSqlFunction( scalaFunc.functionIdentifier(), scalaFunc.toString(), scalaFunc, typeFactory); return relBuilder.call(sqlFunction, child); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12921) Flink Job Scheduling
[ https://issues.apache.org/jira/browse/FLINK-12921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vim updated FLINK-12921: Environment: flink 1.8.0 jdk8 was: flink 1.7.0 jdk8 > Flink Job Scheduling > > > Key: FLINK-12921 > URL: https://issues.apache.org/jira/browse/FLINK-12921 > Project: Flink > Issue Type: Improvement > Components: API / DataSet, API / DataStream >Affects Versions: 1.8.0 > Environment: flink 1.8.0 > jdk8 >Reporter: vim >Priority: Major > > This is an example from flink-docs: > Consider a program with a data source, a _MapFunction_, and a > _ReduceFunction_. The source and MapFunction are executed with a parallelism > of 4, while the ReduceFunction is executed with a parallelism of 3. A > pipeline consists of the sequence Source - Map - Reduce. On a cluster with 2 > TaskManagers with 3 slots each, the program will be executed as described > below. > !https://ci.apache.org/projects/flink/flink-docs-release-1.8/fig/slots.svg! > But after I tried, I found that it was not like this. My result is > TaskManager 1 used 3 slot, but TaskManager 2 just used 1 slot. Who else has > tested this example? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12921) Flink Job Scheduling
vim created FLINK-12921: --- Summary: Flink Job Scheduling Key: FLINK-12921 URL: https://issues.apache.org/jira/browse/FLINK-12921 Project: Flink Issue Type: Improvement Components: API / DataSet, API / DataStream Affects Versions: 1.8.0 Environment: flink 1.7.0 jdk8 Reporter: vim This is an example from flink-docs: Consider a program with a data source, a _MapFunction_, and a _ReduceFunction_. The source and MapFunction are executed with a parallelism of 4, while the ReduceFunction is executed with a parallelism of 3. A pipeline consists of the sequence Source - Map - Reduce. On a cluster with 2 TaskManagers with 3 slots each, the program will be executed as described below. !https://ci.apache.org/projects/flink/flink-docs-release-1.8/fig/slots.svg! But after I tried, I found that it was not like this. My result is TaskManager 1 used 3 slot, but TaskManager 2 just used 1 slot. Who else has tested this example? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb commented on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#issuecomment-504266752 It seems that unstable case led to the failure of Travis. I have re-triggered the Travis through squashing all commits in one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
sunjincheng121 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r296074512 ## File path: flink-python/pyflink/table/table_config.py ## @@ -119,7 +116,7 @@ def get_built_in_database_name(self): """ return self._j_table_config.getBuiltInDatabaseName() -def _set_built_in_database_name(self, built_in_database_name): +def set_built_in_database_name(self, built_in_database_name): Review comment: That makes sense to me (see the summary of my review #2) :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 edited a comment on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
WeiZhong94 edited a comment on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#issuecomment-504262776 @sunjincheng121 Thanks for your review! I have addressed your comments in the new commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#issuecomment-504262776 @sunjincheng121 Thanks for your review! I have addressed you comments in the new commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12920) Drop support for register_table_sink with field_names and field_types parameters
Dian Fu created FLINK-12920: --- Summary: Drop support for register_table_sink with field_names and field_types parameters Key: FLINK-12920 URL: https://issues.apache.org/jira/browse/FLINK-12920 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu The following registerTableSink API in TableEnvironment is deprecated: {code:java} @Deprecated void registerTableSink(String name, String[] fieldNames, TypeInformation[] fieldTypes, TableSink tableSink); {code} We can drop the support of it in Python Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r296074136 ## File path: flink-python/pyflink/table/table_config.py ## @@ -119,7 +116,7 @@ def get_built_in_database_name(self): """ return self._j_table_config.getBuiltInDatabaseName() -def _set_built_in_database_name(self, built_in_database_name): +def set_built_in_database_name(self, built_in_database_name): Review comment: This is Because `TableConfig` class has been refactored to old design in this PR. So I refactored these newly added setter method in this PR too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableT
wuchong commented on a change in pull request #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink URL: https://github.com/apache/flink/pull/8695#discussion_r296072983 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * An interface for partitionable {@link TableSink TableSinks}. A partitionable sink can writes + * query results to partitions. + * + * Partition columns are defined via {@link #getPartitionFieldNames()} and the field names + * should be sorted in a strict order. And all the partition fields should exist in the + * {@link TableSink#getTableSchema()}. + * + * For example, a partitioned table named {@code my_table} with a table schema + * {@code [a INT, b VARCHAR, c DOUBLE, dt VARCHAR, country VARCHAR]} is partitioned on columns + * {@code ds, country}. Then {@code ds} is the first partition column, and + * {@code country} is the secondary partition column. + */ +@Experimental +public interface PartitionableTableSink { + + /** +* Gets the partition field names of the table. The partition field names should be sorted in +* a strict order, i.e. they have the order as specified in the PARTITION statement in DDL. +* This should be an empty set if the table is not partitioned. +* +* All the partition fields should exist in the {@link TableSink#getTableSchema()}. +* +* @return partition field names of the table, empty if the table is not partitioned. +*/ + List getPartitionFieldNames(); + + /** +* If returns true, sink can trust all records will definitely be sorted by partition fields +* before consumed by the {@link TableSink}, i.e. the sink will receive data one partition +* at a time. For some sinks, this can be used to reduce the partition writers to improve Review comment: reduce number of writers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8816: [hotfix][python] Exclude header check for python Docs
asfgit closed pull request #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8816: [hotfix][python] Exclude header check for python Docs
sunjincheng121 commented on issue #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816#issuecomment-504260056 Thanks for the quick review! @dianfu Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8816: [hotfix][python] Exclude header check for python Docs
flinkbot commented on issue #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816#issuecomment-504260040 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8816: [hotfix][python] Exclude header check for python Docs
sunjincheng121 commented on issue #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816#issuecomment-504260078 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8816: [hotfix][python] Exclude header check for python Docs
dianfu commented on issue #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816#issuecomment-504259902 @sunjincheng121 Thanks a lot for the PR. LGTM. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 opened a new pull request #8816: [hotfix][python] Exclude header check for python Docs
sunjincheng121 opened a new pull request #8816: [hotfix][python] Exclude header check for python Docs URL: https://github.com/apache/flink/pull/8816 Hotfix: Exclude header check for python Docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables
KurtYoung commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables URL: https://github.com/apache/flink/pull/8809#discussion_r296071379 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; Review comment: I would suggest to not use `org.apache.flink.batch` prefix for package name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables
KurtYoung commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables URL: https://github.com/apache/flink/pull/8809#discussion_r296071128 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -60,6 +60,13 @@ under the License. provided + + org.apache.flink + flink-table-api-java-bridge_2.11 Review comment: 2.11 should also replaced by ${scala.binary.version} This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11622) Translate the "Command-Line Interface" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869089#comment-16869089 ] Jark Wu commented on FLINK-11622: - Hi [~nicholasjiang], are you still working on this issue? Do you mind to assign this issue to [~yuetongshu]? > Translate the "Command-Line Interface" page into Chinese > > > Key: FLINK-11622 > URL: https://issues.apache.org/jira/browse/FLINK-11622 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Hui Zhao >Assignee: Nicholas Jiang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html > The markdown file is located in > https://github.com/apache/flink/blob/master/docs/ops/cli.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/ops/cli.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869086#comment-16869086 ] Xiaogang Shi commented on FLINK-12887: -- Hi [~till.rohrmann], now we are using many unfenced asynchronous operations in Yarn RM to process notifications from Yarn. Otherwise, Yarn RM will miss some notifications when it has not granted the leadership. Another case is the timers to release stuck containers. When a Yarn RM restarts, it will recover containers from previous attempts. Some containers may be in stuck and we should kill them to release resources. We now use timers to monitor these recovered containers and will kill those containers whose task managers cannot register in time. The timers must be unfenced because the Yarn RM may not grant the leadership when it recovers the containers. > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8783: [FLINK-12863][FLINK-12865] Remove concurrency from HeartbeatManager(Sender)Impl
sunjincheng121 commented on issue #8783: [FLINK-12863][FLINK-12865] Remove concurrency from HeartbeatManager(Sender)Impl URL: https://github.com/apache/flink/pull/8783#issuecomment-504257108 Hi @tillrohrmann, Thanks for work on this PR! Although this is the blocker that we released in 1.8.1, we still should not rush to merged. I think the update you submitted shows that you do not have a rest yesterday night. take care! I think it's also perfect If we finished the 1.8.1 release before the 1.9 feature freeze. So our time is still quite abundant! :) Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12919) flink-connector-hive compilation error in travis
[ https://issues.apache.org/jira/browse/FLINK-12919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf closed FLINK-12919. - Resolution: Fixed > flink-connector-hive compilation error in travis > > > Key: FLINK-12919 > URL: https://issues.apache.org/jira/browse/FLINK-12919 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: leesf >Priority: Major > > [https://api.travis-ci.org/v3/job/548270362/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12919) flink-connector-hive compilation error in travis
[ https://issues.apache.org/jira/browse/FLINK-12919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869081#comment-16869081 ] leesf commented on FLINK-12919: --- Need to rebased on master. > flink-connector-hive compilation error in travis > > > Key: FLINK-12919 > URL: https://issues.apache.org/jira/browse/FLINK-12919 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: leesf >Priority: Major > > [https://api.travis-ci.org/v3/job/548270362/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12919) flink-connector-hive compilation error in travis
leesf created FLINK-12919: - Summary: flink-connector-hive compilation error in travis Key: FLINK-12919 URL: https://issues.apache.org/jira/browse/FLINK-12919 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: leesf [https://api.travis-ci.org/v3/job/548270362/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] mans2singh commented on issue #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
mans2singh commented on issue #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#issuecomment-504250711 @zentol @rmetzger - Please let me know if you can review this request or point me to someone who can advice on it. BTW - The travis build failure is not related to this enhancement and it's test cases. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8815: [FLINK-12918][table][hive] unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl
flinkbot commented on issue #8815: [FLINK-12918][table][hive] unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl URL: https://github.com/apache/flink/pull/8815#issuecomment-504250745 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8815: [FLINK-12918][table][hive] unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl
bowenli86 opened a new pull request #8815: [FLINK-12918][table][hive] unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl URL: https://github.com/apache/flink/pull/8815 ## What is the purpose of the change This PR unifies implementations of CatalogTable by combining GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl. ## Brief change log - combined GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl - moved IS_GENERIC flag from GenericInMemoryCatalog to CatalogConfig - updated unit tests ## Verifying this change This change added tests and can be verified as follows: `GenericInMemoryCatalogTest`, `HiveCatalogHiveMetadataTest`, `HiveCatalogGenericMetadataTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12918) unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl
[ https://issues.apache.org/jira/browse/FLINK-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12918: --- Labels: pull-request-available (was: ) > unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into > CatalogTableImpl > -- > > Key: FLINK-12918 > URL: https://issues.apache.org/jira/browse/FLINK-12918 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > unify {{GenericCatalogTable}}, {{HiveCatalogTable}} and > {{AbstractCatalogTable}} into {{CatalogTableImpl}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12918) unify catalog table implementations
[ https://issues.apache.org/jira/browse/FLINK-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12918: - Description: unify {{GenericCatalogTable}}, {{HiveCatalogTable}} and {{AbstractCatalogTable}} into {{CatalogTableImpl}} > unify catalog table implementations > --- > > Key: FLINK-12918 > URL: https://issues.apache.org/jira/browse/FLINK-12918 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > unify {{GenericCatalogTable}}, {{HiveCatalogTable}} and > {{AbstractCatalogTable}} into {{CatalogTableImpl}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12918) unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl
[ https://issues.apache.org/jira/browse/FLINK-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12918: - Summary: unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into CatalogTableImpl (was: unify catalog table implementations) > unify GenericCatalogTable, HiveCatalogTable and AbstractCatalogTable into > CatalogTableImpl > -- > > Key: FLINK-12918 > URL: https://issues.apache.org/jira/browse/FLINK-12918 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > unify {{GenericCatalogTable}}, {{HiveCatalogTable}} and > {{AbstractCatalogTable}} into {{CatalogTableImpl}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12918) unify catalog table implementations
[ https://issues.apache.org/jira/browse/FLINK-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12918: - Summary: unify catalog table implementations (was: unify catalog table implementation) > unify catalog table implementations > --- > > Key: FLINK-12918 > URL: https://issues.apache.org/jira/browse/FLINK-12918 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12918) unify catalog table implementation
Bowen Li created FLINK-12918: Summary: unify catalog table implementation Key: FLINK-12918 URL: https://issues.apache.org/jira/browse/FLINK-12918 Project: Flink Issue Type: Sub-task Components: Connectors / Hive, Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zuoinlv commented on issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck
zuoinlv commented on issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck URL: https://github.com/apache/flink/pull/2652#issuecomment-504248827 Do we have other option when task cancel timeout is greater than TASK_CANCELLATION_TIMEOUT_MILLIS, rather than leading to termination of the JVM(task manager exit), if do not, then how to make the task close quickly when task cancel? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11622) Translate the "Command-Line Interface" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869056#comment-16869056 ] Jasper Yue commented on FLINK-11622: Hi [~jark], can I assign this issue to me? > Translate the "Command-Line Interface" page into Chinese > > > Key: FLINK-11622 > URL: https://issues.apache.org/jira/browse/FLINK-11622 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Hui Zhao >Assignee: Nicholas Jiang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html > The markdown file is located in > https://github.com/apache/flink/blob/master/docs/ops/cli.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/ops/cli.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yuezhuangshi commented on issue #8799: [FLINK-11612][chinese-translation,Documentation] Translate the "Proje…
yuezhuangshi commented on issue #8799: [FLINK-11612][chinese-translation,Documentation] Translate the "Proje… URL: https://github.com/apache/flink/pull/8799#issuecomment-504246977 @synckey Is there anything I can do if travis-ci build failed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor
tillrohrmann commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor URL: https://github.com/apache/flink/pull/8687#discussion_r296051333 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Wraps a {@link ShuffleEnvironment} to allow tracking of partitions per job. + */ +public class JobAwareShuffleEnvironmentImpl implements JobAwareShuffleEnvironment { + + private final ShuffleEnvironment backingShuffleEnvironment; + + /** +* Releasing in-progress partitions while tasks are still running may lead to odd behaviors, hence we distinguish +* between in-progress and finished partitions so that we can exclusively release finished partitions. +*/ + private final PartitionTable inProgressPartitionTable = new PartitionTable(); + private final PartitionTable finishedPartitionTable = new PartitionTable(); + + private Consumer listener = jobId -> {}; + + /** Tracks which jobs are still being monitored, to ensure cleanup in cases where tasks are finishing while +* the jobmanager connection is being terminated. This is a concurrent set since it is modified by both the +* Task (via {@link #notifyPartitionFinished(JobID, ResultPartitionID)}} and +* TaskExecutor (via {@link #releaseAllFinishedPartitionsAndMarkJobInactive(JobID)}) threads. */ + private final Set activeJobs = ConcurrentHashMap.newKeySet(); + + public JobAwareShuffleEnvironmentImpl(ShuffleEnvironment backingShuffleEnvironment) { + this.backingShuffleEnvironment = Preconditions.checkNotNull(backingShuffleEnvironment); + } + + @Override + public void setPartitionFailedOrFinishedListener(Consumer listener) { + Preconditions.checkNotNull(listener); + this.listener = listener; + } + + @Override + public boolean hasPartitionsOccupyingLocalResources(JobID jobId) { + return inProgressPartitionTable.hasTrackedPartitions(jobId) || finishedPartitionTable.hasTrackedPartitions(jobId); + } + + @Override + public void markJobActive(JobID jobId) { + activeJobs.add(jobId); + } + + @Override + public void releaseFinishedPartitions(JobID jobId, Collection resultPartitionIds) { + // maybe double-check that all partitions are finished? + finishedPartitionTable.stopTrackingPartitions(jobId, resultPartitionIds); + backingShuffleEnvironment.releasePartitions(resultPartitionIds); + } + + @Override + public void releaseAllFinishedPartitionsAndMarkJobInactive(JobID jobId) { + activeJobs.remove(jobId); + Collection finishedPartitionsForJob = finishedPartitionTable.stopTrackingPartitions(jobId);
[GitHub] [flink] bowenli86 commented on issue #8814: [FLINK-12917][hive] support complex type of array, map, struct for Hive functions
bowenli86 commented on issue #8814: [FLINK-12917][hive] support complex type of array, map, struct for Hive functions URL: https://github.com/apache/flink/pull/8814#issuecomment-504234134 cc @xuefuz @JingsongLi @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/…
bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/… URL: https://github.com/apache/flink/pull/8785#discussion_r296048166 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -41,18 +41,26 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-common ${project.version} provided + Review comment: revert extra line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/…
bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/… URL: https://github.com/apache/flink/pull/8785#discussion_r296048166 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -41,18 +41,26 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-common ${project.version} provided + Review comment: revert extra line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/…
bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/… URL: https://github.com/apache/flink/pull/8785#discussion_r296048778 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -41,18 +41,26 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-common ${project.version} provided + Review comment: revert this extra line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/…
bowenli86 commented on a change in pull request #8785: [FLINK-11480][hive] Create HiveTableFactory that creates TableSource/… URL: https://github.com/apache/flink/pull/8785#discussion_r296048716 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HiveCatalogTable; +import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sinks.OutputFormatTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * A table factory implementation for tables stored in Hive catalog. + */ +public class HiveTableFactory implements TableSourceFactory, TableSinkFactory { + + @Override + public Map requiredContext() { + return null; Review comment: right, so shall we throw UnsupportedOperationException in `requiredContext()` and `supportedProperties()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8814: [FLINK-12917][hive] support complex type of array, map, struct for Hive functions
flinkbot commented on issue #8814: [FLINK-12917][hive] support complex type of array, map, struct for Hive functions URL: https://github.com/apache/flink/pull/8814#issuecomment-504232118 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services